Skip to content

Commit

Permalink
[DOP-6758] Fix Hive.check() behavior when Hive Metastore is not avail…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
dolfinus committed Oct 6, 2023
1 parent 32c37ed commit 2acb021
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/164.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``Hive.check()`` behavior when Hive Metastore is not available.
1 change: 1 addition & 0 deletions docs/changelog/next_release/164.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add check to all DB and FileDF connections that Spark session is alive.
12 changes: 11 additions & 1 deletion onetl/connection/db_connection/db_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from logging import getLogger
from typing import TYPE_CHECKING

from pydantic import Field
from pydantic import Field, validator

from onetl._util.spark import try_import_pyspark
from onetl.base import BaseDBConnection
Expand Down Expand Up @@ -48,6 +48,16 @@ def _forward_refs(cls) -> dict[str, type]:
refs["SparkSession"] = SparkSession
return refs

@validator("spark")
def _check_spark_session_alive(cls, spark):
try:
spark.sql("SELECT 1").collect()
except Exception as e:
msg = "Spark session is stopped. Please recreate Spark session."
raise ValueError(msg) from e

return spark

def _log_parameters(self):
log.info("|%s| Using connection parameters:", self.__class__.__name__)
parameters = self.dict(exclude_none=True, exclude={"spark"})
Expand Down
4 changes: 2 additions & 2 deletions onetl/connection/db_connection/hive/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class Hive(DBConnection):
# TODO: remove in v1.0.0
slots = HiveSlots

_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_CHECK_QUERY: ClassVar[str] = "SHOW DATABASES"

@slot
@classmethod
Expand Down Expand Up @@ -207,7 +207,7 @@ def check(self):
log_lines(log, self._CHECK_QUERY, level=logging.DEBUG)

try:
self._execute_sql(self._CHECK_QUERY)
self._execute_sql(self._CHECK_QUERY).limit(1).collect()
log.info("|%s| Connection is available.", self.__class__.__name__)
except Exception as e:
log.exception("|%s| Connection is unavailable", self.__class__.__name__)
Expand Down
1 change: 1 addition & 0 deletions onetl/connection/db_connection/mongodb/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def write_df_to_target(
)

if self._collection_exists(target):
# MongoDB connector does not support mode=ignore and mode=error
if write_options.if_exists == MongoDBCollectionExistBehavior.ERROR:
raise ValueError("Operation stopped due to MongoDB.WriteOptions(if_exists='error')")
elif write_options.if_exists == MongoDBCollectionExistBehavior.IGNORE:
Expand Down
12 changes: 11 additions & 1 deletion onetl/connection/file_df_connection/spark_file_df_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from logging import getLogger
from typing import TYPE_CHECKING

from pydantic import Field
from pydantic import Field, validator

from onetl._util.hadoop import get_hadoop_config
from onetl._util.spark import try_import_pyspark
Expand Down Expand Up @@ -182,6 +182,16 @@ def _forward_refs(cls) -> dict[str, type]:
refs["SparkSession"] = SparkSession
return refs

@validator("spark")
def _check_spark_session_alive(cls, spark):
try:
spark.sql("SELECT 1").collect()
except Exception as e:
msg = "Spark session is stopped. Please recreate Spark session."
raise ValueError(msg) from e

return spark

def _log_parameters(self):
log.info("|%s| Using connection parameters:", self.__class__.__name__)
parameters = self.dict(exclude_none=True, exclude={"spark"})
Expand Down

0 comments on commit 2acb021

Please sign in to comment.