Skip to content

Commit

Permalink
[DOP-6758] Fix Hive.check() behavior when Hive Metastore is not avail…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
dolfinus committed Oct 6, 2023
1 parent 32c37ed commit 0ddd294
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/164.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``Hive.check()`` behavior when Hive Metastore is not available.
1 change: 1 addition & 0 deletions docs/changelog/next_release/164.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add check to all DB and FileDF connections that Spark session is alive.
12 changes: 11 additions & 1 deletion onetl/connection/db_connection/db_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from logging import getLogger
from typing import TYPE_CHECKING

from pydantic import Field
from pydantic import Field, validator

from onetl._util.spark import try_import_pyspark
from onetl.base import BaseDBConnection
Expand Down Expand Up @@ -48,6 +48,16 @@ def _forward_refs(cls) -> dict[str, type]:
refs["SparkSession"] = SparkSession
return refs

@validator("spark")
def _check_spark_session_alive(cls, spark):
try:
spark.sql("SELECT 1").collect()
except Exception as e:
msg = "Spark session is stopped. Please recreate Spark session."
raise ValueError(msg) from e

Check warning on line 57 in onetl/connection/db_connection/db_connection/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/db_connection/connection.py#L55-L57

Added lines #L55 - L57 were not covered by tests

return spark

def _log_parameters(self):
log.info("|%s| Using connection parameters:", self.__class__.__name__)
parameters = self.dict(exclude_none=True, exclude={"spark"})
Expand Down
4 changes: 2 additions & 2 deletions onetl/connection/db_connection/hive/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class Hive(DBConnection):
# TODO: remove in v1.0.0
slots = HiveSlots

_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_CHECK_QUERY: ClassVar[str] = "SHOW DATABASES"

@slot
@classmethod
Expand Down Expand Up @@ -207,7 +207,7 @@ def check(self):
log_lines(log, self._CHECK_QUERY, level=logging.DEBUG)

try:
self._execute_sql(self._CHECK_QUERY)
self._execute_sql(self._CHECK_QUERY).limit(1).collect()
log.info("|%s| Connection is available.", self.__class__.__name__)
except Exception as e:
log.exception("|%s| Connection is unavailable", self.__class__.__name__)
Expand Down
1 change: 1 addition & 0 deletions onetl/connection/db_connection/mongodb/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def write_df_to_target(
)

if self._collection_exists(target):
# MongoDB connector does not support mode=ignore and mode=error
if write_options.if_exists == MongoDBCollectionExistBehavior.ERROR:
raise ValueError("Operation stopped due to MongoDB.WriteOptions(if_exists='error')")
elif write_options.if_exists == MongoDBCollectionExistBehavior.IGNORE:
Expand Down
12 changes: 11 additions & 1 deletion onetl/connection/file_df_connection/spark_file_df_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from logging import getLogger
from typing import TYPE_CHECKING

from pydantic import Field
from pydantic import Field, validator

from onetl._util.hadoop import get_hadoop_config
from onetl._util.spark import try_import_pyspark
Expand Down Expand Up @@ -182,6 +182,16 @@ def _forward_refs(cls) -> dict[str, type]:
refs["SparkSession"] = SparkSession
return refs

@validator("spark")
def _check_spark_session_alive(cls, spark):
try:
spark.sql("SELECT 1").collect()
except Exception as e:
msg = "Spark session is stopped. Please recreate Spark session."
raise ValueError(msg) from e

Check warning on line 191 in onetl/connection/file_df_connection/spark_file_df_connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/file_df_connection/spark_file_df_connection.py#L189-L191

Added lines #L189 - L191 were not covered by tests

return spark

def _log_parameters(self):
log.info("|%s| Using connection parameters:", self.__class__.__name__)
parameters = self.dict(exclude_none=True, exclude={"spark"})
Expand Down
2 changes: 0 additions & 2 deletions tests/tests_unit/tests_db_connection_unit/test_hive_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ def normalize_cluster_name(cluster: str) -> str:


def test_hive_known_get_current_cluster_hook(request, spark_mock, mocker):
mocker.patch.object(Hive, "_execute_sql", return_value=None)

# no exception
Hive(cluster="rnd-prod", spark=spark_mock).check()
Hive(cluster="rnd-dwh", spark=spark_mock).check()
Expand Down

0 comments on commit 0ddd294

Please sign in to comment.