From 4fbf12b957f11007474da7cdb22dd7ea2f0b6362 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 2 Aug 2024 12:46:04 +0200 Subject: [PATCH] Fix notification polyfill from incomplete GQL status (#1080) When connected to a server sending GQL statuses instead of notifications, the driver polyfills `summary.summary_notifications` from the GQL statuses. This PR fixes a but where the driver would ignore any status that was lacking any of the possible fields. However, at least the position is optional and should not cause the status to be swallowed. --- src/neo4j/_work/summary.py | 76 ++- tests/unit/common/work/test_summary.py | 701 ++++++++++++++++++------- 2 files changed, 541 insertions(+), 236 deletions(-) diff --git a/src/neo4j/_work/summary.py b/src/neo4j/_work/summary.py index 3029c53f..5c81478d 100644 --- a/src/neo4j/_work/summary.py +++ b/src/neo4j/_work/summary.py @@ -139,6 +139,33 @@ def __getattr__(self, key): f"'{self.__class__.__name__}' object has no attribute '{key}'" ) + @staticmethod + def _notification_from_status(status: dict) -> dict: + notification = {} + for notification_key, status_key in ( + ("title", "title"), + ("code", "neo4j_code"), + ("description", "description"), + ): + if status_key in status: + notification[notification_key] = status[status_key] + + if "diagnostic_record" in status: + diagnostic_record = status["diagnostic_record"] + if not isinstance(diagnostic_record, dict): + diagnostic_record = {} + + for notification_key, diag_record_key in ( + ("severity", "_severity"), + ("category", "_classification"), + ("position", "_position"), + ): + if diag_record_key in diagnostic_record: + notification[notification_key] = \ + diagnostic_record[diag_record_key] + + return notification + def _set_notifications(self): if "notifications" in self.metadata: notifications = self.metadata["notifications"] @@ -156,53 +183,10 @@ def _set_notifications(self): return notifications = [] for status in statuses: - if not isinstance(status, dict): - continue - notification = {} - failed = False - for notification_key, status_key in ( - ("title", "title"), - ("code", "neo4j_code"), - ("description", "description"), - ): - value = status.get(status_key) - if not isinstance(value, str) or not value: - failed = True - break - notification[notification_key] = value - if failed: - continue - diagnostic_record = status.get("diagnostic_record") - if not isinstance(diagnostic_record, dict): - continue - for notification_key, diag_record_key in ( - ("severity", "_severity"), - ("category", "_classification"), - ): - value = diagnostic_record.get(diag_record_key) - if not isinstance(value, str) or not value: - failed = True - break - notification[notification_key] = value - if failed: - continue - raw_position = diagnostic_record.get("_position") - if not isinstance(raw_position, dict): - continue - position = {} - for pos_key, raw_pos_key in ( - ("line", "line"), - ("column", "column"), - ("offset", "offset"), - ): - value = raw_position.get(raw_pos_key) - if not isinstance(value, int) or isinstance(value, bool): - failed = True - break - position[pos_key] = value - if failed: + if not (isinstance(status , dict) and "neo4j_code" in status): + # not a notification status continue - notification["position"] = position + notification = self._notification_from_status(status) notifications.append(notification) self.notifications = notifications return diff --git a/tests/unit/common/work/test_summary.py b/tests/unit/common/work/test_summary.py index bb361071..88ea56cf 100644 --- a/tests/unit/common/work/test_summary.py +++ b/tests/unit/common/work/test_summary.py @@ -23,6 +23,9 @@ if t.TYPE_CHECKING: import typing_extensions as te + _T = t.TypeVar("_T") + _TDict = t.TypeVar("_TDict", bound=dict) + import pytest from neo4j import ( @@ -1499,7 +1502,7 @@ def test_no_notification_from_status(raw_status, summary_args_kwargs) -> None: ), # _position maps to position - # * extra fields are ignored + # * extra fields are copied to raw data ( {}, {"_position": {"line": 1234, "column": 0, "offset": -9999}}, @@ -1512,11 +1515,17 @@ def test_no_notification_from_status(raw_status, summary_args_kwargs) -> None: ), ( {}, - {"_position": - {"line": 1234, "column": 0, "offset": -9999, "extra": "hi :)"} + { + "_position": { + "line": 1234, "column": 0, "offset": -9999, + "extra": "hi :)" + } }, { - "raw_position": {"line": 1234, "column": 0, "offset": -9999}, + "raw_position": { + "line": 1234, "column": 0, "offset": -9999, + "extra": "hi :)" + }, "position": SummaryInputPosition( line=1234, column=0, offset=-9999 ), @@ -1632,6 +1641,338 @@ def test_notification_from_status( == expected_overwrite.get("position", default_position)) +@pytest.mark.parametrize( + "status_in", + ( + None, + 1, + False, + ["neo4j_code"], + "neo4j_code" + ) +) +def test_no_notification_from_wrong_type_status( + status_in, summary_args_kwargs +) -> None: + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [status_in] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [] + assert summary_notifications == [] + + +def _get_from_dict( + dict_: _TDict, + key: t.Sequence, + default: t.Any = None, +) -> t.Any: + if not key: + raise ValueError("key must not be empty") + target = dict_ + for key_ in key[:-1]: + if key_ not in target: + return default + target_any = target[key_] + if not isinstance(target, dict): + return default + target = target_any + return target.get(key[-1], default) + + +def _del_from_dict( + dict_: _TDict, + del_key: t.Sequence, +) -> _TDict: + if not del_key: + raise ValueError("del_key must not be empty") + target = dict_ + for i, key in enumerate(del_key[:-1]): + if not isinstance(target.get(key), dict): + raise TypeError( + f"Expected dict at {del_key!r}[{i}], got {target!r}" + ) + target = target[key] + del target[del_key[-1]] + + return dict_ + + +def _set_in_dict( + dict_: _TDict, + set_key: t.Sequence, + set_value: t.Any, + *, + create_if_missing: bool = True, +) -> _TDict: + if not set_key: + raise ValueError("set_key must not be empty") + target = dict_ + for i, key in enumerate(set_key[:-1]): + if create_if_missing: + target = target.setdefault(key, {}) + else: + if key not in target: + return dict_ + target = target[key] + if not isinstance(target, dict): + raise TypeError( + f"Expected dict at {set_key!r}[{i}], got {target!r}" + ) + + if not create_if_missing and set_key[-1] not in target: + return dict_ + target[set_key[-1]] = set_value + + return dict_ + + +def _make_raw_status_obj( + *, + del_keys: t.Iterable[t.Tuple[str, ...]] = (), + replace: t.Optional[t.Dict[t.Tuple[str, ...], t.Any]] = None +) -> t.Dict[str, t.Any]: + raw_status: t.Dict[str, t.Any] = { + "gql_status": "03BAZ", + "status_description": "note: successful completion - custom stuff", + "description": "nice message", + "neo4j_code": "Neo.Foo.Bar.Baz", + "title": "Some cool title which defo is dope!", + "diagnostic_record": { + "OPERATION": "", + "OPERATION_CODE": "0", + "CURRENT_SCHEMA": "/", + "_status_parameters": {}, + "_severity": "INFORMATION", + "_classification": "HINT", + "_position": { + "line": 1337, + "column": 42, + "offset": 420, + }, + } + } + + if replace is None: + replace = {} + for key, value in replace.items(): + _set_in_dict(raw_status, key, value) + + for del_key in del_keys: + _del_from_dict(raw_status, del_key) + + return raw_status + + +def _make_raw_notification_obj( + *, + del_keys: t.Iterable[t.Tuple[str, ...]] = (), + replace: t.Optional[t.Dict[t.Tuple[str, ...], t.Any]] = None +) -> t.Dict[str, t.Any]: + raw_notification_obj: t.Dict[str, t.Any] = { + "code": "Neo.Foo.Bar.Baz", + "description": "nice message", + "title": "Some cool title which defo is dope!", + "severity": "INFORMATION", + "category": "HINT", + "position": { + "line": 1337, + "column": 42, + "offset": 420, + }, + } + + key_translation: t.Dict[ + t.Tuple[str, ...], + t.Tuple[t.Tuple[str, ...], ...] + ] = { + ("neo4j_code",): ( + ("code",), + ), + ("description",): ( + ("description",), + ), + ("title",): ( + ("title",), + ), + ("diagnostic_record", "_severity"): ( + ("severity",), + ), + ("diagnostic_record",): ( + ("severity",), + ("category",), + ("position",), + ), + ("diagnostic_record", "_classification"): ( + ("category",), + ), + ("diagnostic_record", "_position"): ( + ("position",), + ), + } + + # dicts the driver will copy 1-to-1 including unknown keys + copied_dicts = ( + ("diagnostic_record", "_position"), + ) + + del_keys_list = list(del_keys) + + if replace is None: + replace = {} + for key, value in replace.items(): + create_if_missing = False + translated_keys = key_translation.get(key, ()) + if not translated_keys: + for copied_dict in copied_dicts: + if key[:len(copied_dict)] == copied_dict: + create_if_missing = True + translated_keys = tuple( + (*k, *key[len(copied_dict):]) + for k in key_translation[copied_dict] + ) + break + else: + continue + if len(translated_keys) > 1: + # original key points to dict/list + # => cannot be translated by the driver's polyfill + # => expect it to not be present in the polyfilled value + del_keys_list.append(key) + continue + translated_key = translated_keys[0] + _set_in_dict( + raw_notification_obj, + translated_key, + value, + # position is copied as is + create_if_missing=create_if_missing + ) + + for key in del_keys_list: + translated_keys = key_translation.get(key, ()) + if not translated_keys: + for copied_dict in copied_dicts: + if key[:len(copied_dict)] == copied_dict: + translated_keys = tuple( + (*k, *key[len(copied_dict):]) + for k in key_translation[copied_dict] + ) + break + else: + continue + for translated_key in translated_keys: + _del_from_dict(raw_notification_obj, translated_key) + + return raw_notification_obj + + +def test_no_notification_from_status_without_neo4j_code( + summary_args_kwargs +) -> None: + raw_status = _make_raw_status_obj(del_keys=(("neo4j_code",),)) + + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [raw_status] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [] + assert summary_notifications == [] + + +@pytest.mark.parametrize( + "del_key", + ( + ("gql_status",), + ("status_description",), + ("description",), + ("title",), + ("diagnostic_record",), + ("diagnostic_record", "OPERATION"), + ("diagnostic_record", "OPERATION_CODE"), + ("diagnostic_record", "CURRENT_SCHEMA"), + ("diagnostic_record", "_status_parameters"), + ("diagnostic_record", "_severity"), + ("diagnostic_record", "_classification"), + ("diagnostic_record", "_position"), + ("diagnostic_record", "_position", "line"), + ("diagnostic_record", "_position", "column"), + ("diagnostic_record", "_position", "offset"), + ) +) +def test_notification_from_incomplete_status( + del_key: t.Tuple[str, ...], summary_args_kwargs +) -> None: + raw_status = _make_raw_status_obj(del_keys=(del_key,)) + raw_notification = _make_raw_notification_obj(del_keys=(del_key,)) + + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [raw_status] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [raw_notification] + + assert summary_notifications == [ + SummaryNotification._from_metadata(raw_notification) + ] + + +@pytest.mark.parametrize( + "set_key", + ( + ("neo4j_code",), + ("gql_status",), + ("status_description",), + ("description",), + ("title",), + ("",), + ("diagnostic_record",), + ("diagnostic_record", "OPERATION"), + ("diagnostic_record", "OPERATION_CODE"), + ("diagnostic_record", "CURRENT_SCHEMA"), + ("diagnostic_record", "_status_parameters"), + ("diagnostic_record", "_severity"), + ("diagnostic_record", "_classification"), + ("diagnostic_record", ""), + ("diagnostic_record", "_position"), + ("diagnostic_record", "_position", "line"), + ("diagnostic_record", "_position", "column"), + ("diagnostic_record", "_position", "offset"), + ("diagnostic_record", "_position", ""), + ) +) +@pytest.mark.parametrize("set_value", + (None, 1, 3.14159, False, [], {}, "", "hi :)")) +def test_notification_from_unexpected_status( + set_key: t.Tuple[str, ...], set_value: t.Any, summary_args_kwargs, +) -> None: + replace = {set_key: set_value} + raw_status = _make_raw_status_obj(replace=replace) + raw_notification = _make_raw_notification_obj(replace=replace) + + args, kwargs = summary_args_kwargs + kwargs["metadata"]["statuses"] = [raw_status] + + summary = ResultSummary(*args, **kwargs) + notifications = summary.notifications + summary_notifications = summary.summary_notifications + + assert notifications == [raw_notification] + + assert summary_notifications == [ + SummaryNotification._from_metadata(raw_notification) + ] + + def _test_status(): return { "gql_status": "12345", @@ -1673,60 +2014,10 @@ def _test_status_broken_key(key_path, value): ( None, 1, + False, {}, + [], True, - _test_status_missing_key(("status_description",)), - _test_status_missing_key(("neo4j_code",)), - _test_status_missing_key(("title",)), - _test_status_missing_key(("diagnostic_record",)), - _test_status_missing_key(("diagnostic_record", "_severity")), - _test_status_missing_key(("diagnostic_record", "_classification")), - _test_status_missing_key(("diagnostic_record", "_position")), - _test_status_missing_key(("diagnostic_record", "_position", "line")), - _test_status_missing_key(("diagnostic_record", "_position", "column")), - _test_status_missing_key(("diagnostic_record", "_position", "offset")), - - _test_status_broken_key(("status_description",), - None), - _test_status_broken_key(("neo4j_code",), - None), - _test_status_broken_key(("title",), - None), - _test_status_broken_key(("diagnostic_record",), - None), - _test_status_broken_key(("diagnostic_record", "_severity"), - None), - _test_status_broken_key(("diagnostic_record", "_classification"), - None), - _test_status_broken_key(("diagnostic_record", "_position"), - None), - _test_status_broken_key(("diagnostic_record", "_position", "line"), - None), - _test_status_broken_key(("diagnostic_record", "_position", "column"), - None), - _test_status_broken_key(("diagnostic_record", "_position", "offset"), - None), - - _test_status_broken_key(("status_description",), - False), - _test_status_broken_key(("neo4j_code",), - False), - _test_status_broken_key(("title",), - False), - _test_status_broken_key(("diagnostic_record",), - False), - _test_status_broken_key(("diagnostic_record", "_severity"), - False), - _test_status_broken_key(("diagnostic_record", "_classification"), - False), - _test_status_broken_key(("diagnostic_record", "_position"), - False), - _test_status_broken_key(("diagnostic_record", "_position", "line"), - False), - _test_status_broken_key(("diagnostic_record", "_position", "column"), - False), - _test_status_broken_key(("diagnostic_record", "_position", "offset"), - False), ) ) def test_notification_from_broken_status( @@ -1742,140 +2033,6 @@ def test_notification_from_broken_status( assert notifications == [] -@pytest.mark.parametrize( - "in_status", - ( - _test_status_missing_key(("diagnostic_record",)), - - _test_status_broken_key(("diagnostic_record",), None), - _test_status_broken_key(("diagnostic_record",), 1), - _test_status_broken_key(("diagnostic_record",), True), - _test_status_broken_key(("diagnostic_record",), [None]), - ) -) -def test_broken_diagnostic_record(in_status, summary_args_kwargs) -> None: - args, kwargs = summary_args_kwargs - kwargs["metadata"]["statuses"] = [in_status] - - summary = ResultSummary(*args, **kwargs) - - with pytest.warns(PreviewWarning, match="GQLSTATUS"): - statuses = summary.gql_status_objects - assert len(statuses) == 1 - status = statuses[0] - - assert status.diagnostic_record == {} - assert status.position is None - assert status.raw_classification is None - assert status.classification is NotificationClassification.UNKNOWN - assert status.raw_severity is None - assert status.severity is NotificationSeverity.UNKNOWN - - -@pytest.mark.parametrize( - ("status_overwrite", "diagnostic_record_overwrite"), - ( - *( - ({"description": value}, {}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({"title": value}, {}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({}, {"_severity": value}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({}, {"_classification": value}) - for value in t.cast(t.Iterable[t.Any], - ("", None, ..., 1, False, [], {})) - ), - *( - ({}, {"_position": value}) - for value in t.cast( - t.Iterable[t.Any], ( - None, ..., 1, False, [], {}, - {"column": 1, "offset": 1}, - {"line": 1, "offset": 1}, - {"line": 1, "column": 1}, - {"line": None, "column": 1, "offset": 1}, - {"line": 1, "column": None, "offset": 1}, - {"line": 1, "column": 1, "offset": None}, - {"line": False, "column": 1, "offset": 1}, - {"line": 1, "column": False, "offset": 1}, - {"line": 1, "column": 1, "offset": False}, - {"line": [], "column": 1, "offset": 1}, - {"line": 1, "column": [], "offset": 1}, - {"line": 1, "column": 1, "offset": []}, - {"line": 1.0, "column": 1, "offset": 1}, - {"line": 1, "column": 1.0, "offset": 1}, - {"line": 1, "column": 1, "offset": 1.0}, - {"line": "1", "column": 1, "offset": 1}, - {"line": 1, "column": "1", "offset": 1}, - {"line": 1, "column": 1, "offset": "1"}, - ) - ) - ), - ) -) -def test_no_notification_from_broken_status( - status_overwrite, diagnostic_record_overwrite, summary_args_kwargs -) -> None: - default_status = "03BAZ" - default_status_description = "note: successful completion - custom stuff" - default_description = "some description" - default_code = "Neo.Foo.Bar.Baz" - default_title = "Some cool title which defo is dope!" - default_severity = "INFORMATION" - default_classification = "HINT" - default_position = SummaryInputPosition(line=1337, column=42, offset=420) - raw_status_obj: t.Dict[str, t.Any] = { - "gql_status": default_status, - "description": default_description, - "status_description": default_status_description, - "neo4j_code": default_code, - "title": default_title, - "diagnostic_record": { - "OPERATION": "", - "OPERATION_CODE": "0", - "CURRENT_SCHEMA": "/", - "_status_parameters": {}, - "_severity": default_severity, - "_classification": default_classification, - "_position": { - "line": default_position.line, - "column": default_position.column, - "offset": default_position.offset, - }, - } - } - for key, value in diagnostic_record_overwrite.items(): - if value is ...: - del raw_status_obj["diagnostic_record"][key] - else: - raw_status_obj["diagnostic_record"][key] = value - for key, value in status_overwrite.items(): - if value is ...: - del raw_status_obj[key] - else: - raw_status_obj[key] = value - - args, kwargs = summary_args_kwargs - kwargs["metadata"]["statuses"] = [raw_status_obj] - - summary = ResultSummary(*args, **kwargs) - notifications = summary.notifications - summary_notifications = summary.summary_notifications - - assert notifications == [] - assert summary_notifications == [] - - def test_notifications_from_statuses_keep_order( summary_args_kwargs, ) -> None: @@ -1912,3 +2069,167 @@ def test_notifications_from_statuses_keep_order( 3, "INFORMATION") helper.assert_parsed_notification_is_status(summary_notifications[3], 1, "WARNING") + + +def _make_summary_notification( + overwrite: t.Optional[t.Dict[str, t.Any]] = None +) -> SummaryNotification: + if overwrite is None: + overwrite = {} + return SummaryNotification( + title=overwrite.get("title", ""), + code=overwrite.get("code", ""), + description=overwrite.get("description", ""), + severity_level=overwrite.get( + "severity_level", NotificationSeverity.UNKNOWN + ), + category=overwrite.get( + "category", NotificationCategory.UNKNOWN + ), + raw_severity_level=overwrite.get("raw_severity_level", ""), + raw_category=overwrite.get("raw_category", ""), + position=overwrite.get("position") + ) + + +@pytest.mark.parametrize( + ("meta", "expected"), + ( + # empty meta data + ({}, _make_summary_notification()), + + # all string fields + *( + ( + {key: str_value}, + _make_summary_notification({key: str_value}) + ) + for key in ("title", "code", "description") + for str_value in ("", " ", "Foo Bar Baz") + ), + *( + ( + {key: title}, + _make_summary_notification() + ) + for key in ("title", "code", "description") + for title in t.cast(t.Iterable, + (True, False, 1, [], {}, 1.2345, None)) + ), + + # severity + *( + ( + {"severity": raw_value}, + _make_summary_notification({ + "raw_severity_level": raw_value, + "severity_level": parsed_value, + }) + ) + for raw_value, parsed_value in ( + ("", NotificationSeverity.UNKNOWN), + (" ", NotificationSeverity.UNKNOWN), + ("Foo Bar", NotificationSeverity.UNKNOWN), + ("WARNING", NotificationSeverity.WARNING), + ("INFORMATION", NotificationSeverity.INFORMATION), + ) + ), + *( + ( + {"severity": value}, + _make_summary_notification() + ) + for value in t.cast(t.Iterable, + (True, False, 1, [], {}, 1.2345, None)) + ), + + # category + *( + ( + {"category": raw_value}, + _make_summary_notification({ + "raw_category": raw_value, + "category": parsed_value, + }) + ) + for raw_value, parsed_value in ( + ("", NotificationCategory.UNKNOWN), + (" ", NotificationCategory.UNKNOWN), + ("Foo Bar", NotificationCategory.UNKNOWN), + ("HINT", NotificationCategory.HINT), + ("UNRECOGNIZED", NotificationCategory.UNRECOGNIZED), + ("UNSUPPORTED", NotificationCategory.UNSUPPORTED), + ("PERFORMANCE", NotificationCategory.PERFORMANCE), + ("DEPRECATION", NotificationCategory.DEPRECATION), + ("GENERIC", NotificationCategory.GENERIC), + ("SECURITY", NotificationCategory.SECURITY), + ("TOPOLOGY", NotificationCategory.TOPOLOGY), + ) + ), + *( + ( + {"category": value}, + _make_summary_notification() + ) + for value in t.cast(t.Iterable, + (True, False, 1, [], {}, 1.2345, None)) + ), + + # position + *( + ( + {"position": raw_value}, + _make_summary_notification({"position": parsed_value}) + ) + for raw_value, parsed_value in ( + ( + {"line": 1, "column": 2, "offset": 3}, + SummaryInputPosition(line=1, column=2, offset=3) + ), + ( + {"line": -100, "column": -200, "offset": -300}, + SummaryInputPosition(line=-100, column=-200, offset=-300) + ), + ( + {"line": 0, "column": 0, "offset": 0}, + SummaryInputPosition(line=0, column=0, offset=0) + ), + # extraneous values are ignored + *( + ( + {"line": 1, "column": 2, "offset": 3, "extra": extra}, + SummaryInputPosition(line=1, column=2, offset=3) + ) + for extra in t.cast( + t.Iterable, + (None, 1, False, [], {}, "hi :)", "5", 1.2345) + ) + ), + ) + ), + # missing field/field of wrong type => position is default value + *( + ( + {"position": value}, + _make_summary_notification() + ) + for value in ( + *( + {"line": 1, "column": 2, "offset": 3, key: value} + for key in ("line", "column", "offset") + for value in t.cast(t.Iterable, + (1.2, None, False, True, [], {}, "1")) + ), + {"column": 2, "offset": 3}, + {"line": 1, "offset": 3}, + {"line": 1, "column": 2}, + ) + ), + ) +) +def test_notification_from_meta_data( + meta: t.Dict, + expected: SummaryNotification, +) -> None: + notification = SummaryNotification._from_metadata(meta) + assert notification == expected