Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-impala: introduce new error handling #2759

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.impala.impala_memory_error_handler import ImpalaMemoryErrorHandler

MEMORY_LIMIT_PATTERN = r"Limit=(\d+\.\d+)\s*([KMGTP]B)"
Expand Down Expand Up @@ -57,16 +58,14 @@ def handle_error(
return False

if self._is_pool_error(caught_exception):
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened="An Impala Pool Error occurred: " + str(caught_exception),
why_it_happened="Review the contents of the exception.",
consequences="The queries will not be executed.",
countermeasures=(
errors.report_and_throw(
UserCodeError(
"An Impala Pool Error occurred: " + str(caught_exception),
"Review the contents of the exception.",
"The queries will not be executed.",
"Optimise the executed queries. Alternatively, make sure that "
"the data job is not running too many queries in parallel."
),
"the data job is not running too many queries in parallel.",
)
)

is_handled = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pyarrow
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.impala import impala_error_classifier
from vdk.plugin.impala.impala_connection import ImpalaConnection

Expand All @@ -20,16 +21,13 @@ def get_table_description(self, table_name):
return self._db_connection.execute_query(f"DESCRIBE formatted {table_name}")
except Exception as e:
if impala_error_classifier._is_authorization_error(e):
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened=f"Data loading into table {table_name} has failed.",
why_it_happened=(
errors.report_and_rethrow(
UserCodeError(
f"Data loading into table {table_name} has failed.",
f"You are trying to load data into a table which you do not have access to or it does not "
f"exist: {table_name}."
),
consequences="Data load will be aborted.",
countermeasures="Make sure that the destination table exists and you have access to it.",
f"exist: {table_name}. Data load will be aborted.",
"Make sure that the destination table exists and you have access to it.",
)
)
else:
raise e
Expand Down Expand Up @@ -93,30 +91,26 @@ def ensure_table_format_is_parquet(self, table_name, table_description):
if "parquet" in value: # table is stored as parquet
return
else: # table is not stored as parquet
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=self._log,
what_happened="Data loading has failed.", # FIXME: this is too specific
why_it_happened=(
errors.report_and_throw(
UserCodeError(
"Data loading has failed.", # FIXME: this is too specific
f"You are trying to load data into a table {table_name} with an unsupported format. "
f"Currently only Parquet table format is supported."
),
consequences="Data load will be aborted.", # FIXME: this is too specific
countermeasures=(
f"Data load will be aborted.", # FIXME: this is too specific
"Make sure that the destination table is stored as parquet: "
"https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html"
"#parquet_ddl"
"#parquet_ddl",
),
)
# TODO once there is more robust loading implemented the below error can be removed. We can try to load even if
# we cannot determine the table storage type
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR,
log=self._log,
what_happened="Cannot determine the target table file format, which is needed to load data into it.",
why_it_happened="There's a bug in VDK code.",
consequences="Application will exit.",
countermeasures="Report this bug to versatile data kit team.",
errors.report_and_throw(
errors.PlatformServiceError(
"Cannot determine the target table file format, which is needed to load data into it.",
"There's a bug in VDK code.",
"Application will exit.",
"Report this bug to Versatile Data Kit team.",
)
)

def generate_parquet_schema_from_table_schema(self, table_columns):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from vdk.api.job_input import IJobInput
from vdk.internal.builtin_plugins.run.job_input import JobInput
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.plugin.impala.impala_helper import ImpalaHelper

log = getLogger(__name__)
Expand Down Expand Up @@ -57,12 +58,4 @@ def _validate_args(self, args: dict) -> dict:
try:
return self.TemplateParams(**args).dict()
except ValidationError as error:
errors.log_and_rethrow(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=log,
what_happened="Template execution in Data Job finished with error",
duyguHsnHsn marked this conversation as resolved.
Show resolved Hide resolved
why_it_happened=errors.MSG_WHY_FROM_EXCEPTION(error),
consequences=errors.MSG_CONSEQUENCE_TERMINATING_APP,
countermeasures=errors.MSG_COUNTERMEASURE_FIX_PARENT_EXCEPTION,
exception=error,
)
errors.report_and_rethrow(ResolvableBy.USER_ERROR, error)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.impala import impala_plugin
from vdk.plugin.test_utils.util_funcs import cli_assert
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
Expand Down Expand Up @@ -633,22 +634,17 @@ def _run_template_with_bad_arguments(
f"for {template_name} template"
)

test_exception = Exception(expected_error_regex)

def just_rethrow(*_, **kwargs):
raise Exception(expected_error_regex)
raise test_exception

with patch.object(errors, "log_and_rethrow") as patched_log_and_rethrow:
patched_log_and_rethrow.side_effect = just_rethrow
with patch.object(errors, "report_and_rethrow") as patched_report_and_rethrow:
patched_report_and_rethrow.side_effect = just_rethrow
result = self._run_job(template_name, template_args)
assert expected_error_regex in result.output, result.output
assert errors.log_and_rethrow.call_args[1]["what_happened"], result.output
assert (
f"{num_exp_errors} validation error"
in errors.log_and_rethrow.call_args[1]["why_it_happened"]
or f"{num_exp_errors}\\ validation\\ error"
in errors.log_and_rethrow.call_args[1]["why_it_happened"]
), result.output
assert errors.log_and_rethrow.call_args[1]["consequences"], result.output
assert errors.log_and_rethrow.call_args[1]["countermeasures"], result.output
actual_args, actual_kwargs = errors.report_and_rethrow.call_args
assert str(actual_args) == str((ResolvableBy.USER_ERROR, test_exception))

def _run_template_with_bad_target_schema(
self, template_name: str, template_args: dict
Expand Down Expand Up @@ -686,29 +682,28 @@ def _run_template_with_bad_target_schema(
f"clause. Please change the table definition accordingly and re-create the table."
)

expected_error = UserCodeError(
"Data loading has failed.", # FIXME: this is too specific
f"You are trying to load data into a table {table_name} with an unsupported format. "
f"Currently only Parquet table format is supported."
f"Data load will be aborted.", # FIXME: this is too specific
"Make sure that the destination table is stored as parquet: "
"https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html"
"#parquet_ddl",
)

def just_throw(*_, **kwargs):
raise Exception(expected_why_it_happened_msg)

with patch(
"vdk.internal.core.errors.log_and_throw", MagicMock(side_effect=just_throw)
"vdk.internal.core.errors.report_and_throw",
MagicMock(side_effect=just_throw),
):
res = self._run_job(template_name, template_args)
assert expected_why_it_happened_msg in res.output
errors.log_and_throw.assert_called_once_with(
to_be_fixed_by=ResolvableBy.USER_ERROR,
log=ANY,
what_happened="Data loading has failed.",
why_it_happened=(
f"You are trying to load data into a table {table_name} with an unsupported format. "
f"Currently only Parquet table format is supported."
),
consequences="Data load will be aborted.",
countermeasures=(
"Make sure that the destination table is stored as parquet: "
"https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html"
"#parquet_ddl"
),
)
actual_args, actual_kwargs = errors.report_and_throw.call_args
actual_message = actual_args[0]
assert str(actual_message) == expected_error.__str__()

def test_insert(self) -> None:
test_schema = "vdkprototypes"
Expand Down