Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier/common_dict_eval] Move height and height type to generic function #958

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions monitoring/monitorlib/fetch/rid.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,16 @@ class Position(ImplicitDict):

@staticmethod
def from_v19_rid_aircraft_position(
p: v19.api.RIDAircraftPosition, t: v19.api.StringBasedDateTime
p: v19.api.RIDAircraftPosition,
t: v19.api.StringBasedDateTime,
h: Optional[v19.api.RIDHeight],
) -> Position:
return Position(
lat=p.lat,
lng=p.lng,
alt=p.alt,
time=t.datetime,
height=None,
height=h,
accuracy_v=p.accuracy_v if "accuracy_v" in p else None,
accuracy_h=p.accuracy_h if "accuracy_h" in p else None,
)
Expand Down Expand Up @@ -237,6 +239,7 @@ def most_recent_position(
return Position.from_v19_rid_aircraft_position(
self.v19_value.current_state.position,
self.v19_value.current_state.timestamp,
self.height,
)
elif self.rid_version == RIDVersion.f3411_22a:
return Position.from_v22a_rid_aircraft_position(
Expand All @@ -254,7 +257,7 @@ def most_recent_position(
def recent_positions(self) -> List[Position]:
if self.rid_version == RIDVersion.f3411_19:
return [
Position.from_v19_rid_aircraft_position(p.position, p.time)
Position.from_v19_rid_aircraft_position(p.position, p.time, self.height)
for p in self.v19_value.recent_positions
]
elif self.rid_version == RIDVersion.f3411_22a:
Expand Down Expand Up @@ -407,6 +410,28 @@ def aircraft_type(
f"Cannot retrieve aircraft_type using RID version {self.rid_version}"
)

@property
def height(
self,
) -> Optional[Union[v19.api.RIDHeight, v22a.api.RIDHeight]]:
if self.rid_version == RIDVersion.f3411_19:
if not self.v19_value.has_field_with_value(
"current_state"
) or not self.v19_value.current_state.has_field_with_value("height"):
return None
return self.v19_value.current_state.height
elif self.rid_version == RIDVersion.f3411_22a:
if (
not self.most_recent_position
or not self.most_recent_position.has_field_with_value("height")
):
return None
return self.most_recent_position.height
else:
raise NotImplementedError(
f"Cannot retrieve aircraft_type using RID version {self.rid_version}"
)

def errors(self) -> List[str]:
try:
rid_version = self.rid_version
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import datetime
import math
from collections.abc import Callable
Expand All @@ -14,6 +16,7 @@
from uas_standards.astm.f3411.v22a.api import (
UASID,
HorizontalAccuracy,
RIDHeightReference,
RIDOperationalStatus,
SpeedAccuracy,
Time,
Expand Down Expand Up @@ -64,6 +67,8 @@ class RIDCommonDictionaryEvaluator(object):
"_evaluate_vertical_speed",
"_evaluate_speed",
"_evaluate_track",
"_evaluate_height",
"_evaluate_height_type",
]
details_evaluators = [
"_evaluate_ua_classification",
Expand Down Expand Up @@ -144,11 +149,6 @@ def evaluate_dp_flight(
observed_flight.most_recent_position,
participants,
)
self._evaluate_height(
injected_telemetry.get("height"),
observed_flight.most_recent_position.get("height"),
participants,
)

def evaluate_sp_details(
self,
Expand Down Expand Up @@ -536,66 +536,69 @@ def _evaluate_position(
message=f"Unsupported version {self._rid_version}: skipping position evaluation",
)

def _evaluate_height(
self,
height_inj: Optional[injection.RIDHeight],
height_obs: Optional[observation_api.RIDHeight],
participants: List[str],
):
if self._rid_version == RIDVersion.f3411_22a:
if height_obs is not None:
with self._test_scenario.check(
"Height consistency with Common Dictionary", participants
) as check:
if (
height_obs.reference
!= observation_api.RIDHeightReference.TakeoffLocation
and height_obs.reference
!= observation_api.RIDHeightReference.GroundLevel
):
check.record_failed(
f"Invalid height type: {height_obs.reference}",
details=f"The height type reference shall be either {observation_api.RIDHeightReference.TakeoffLocation} or {observation_api.RIDHeightReference.GroundLevel}",
)
def _evaluate_height(self, **generic_kwargs):
"""
Evaluates Height. Exactly one of sp_observed or dp_observed must be provided.
See as well `common_dictionary_evaluator.md`.

with self._test_scenario.check(
"Height is consistent with injected one", participants
) as check:
if not math.isclose(
height_obs.distance, height_inj.distance, abs_tol=1.0
):
check.record_failed(
"Observed Height is inconsistent with injected one",
details=f"Observed height: {height_obs} - injected: {height_inj}",
)
Raises:
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

with self._test_scenario.check(
"Height Type consistency with Common Dictionary", participants
) as check:
if (
height_obs.reference
!= observation_api.RIDHeightReference.TakeoffLocation
and height_obs.reference
!= observation_api.RIDHeightReference.GroundLevel
):
check.record_failed(
f"Invalid height type: {height_obs.reference}",
details=f"The height type reference shall be either {observation_api.RIDHeightReference.TakeoffLocation} or {observation_api.RIDHeightReference.GroundLevel}",
)
def value_comparator(v1: Optional[float], v2: Optional[float]) -> bool:

with self._test_scenario.check(
"Height Type is consistent with injected one", participants
) as check:
if height_obs.reference != height_inj.reference:
check.record_failed(
"Observed Height type is inconsistent with injected one",
details=f"Observed height: {height_obs} - injected: {height_inj}",
)
else:
self._test_scenario.record_note(
key="skip_reason",
message=f"Unsupported version {self._rid_version}: skipping Height and Height Type evaluation",
)
if v1 is None or v2 is None:
return False

return abs(v1 - v2) < constants.MinHeightResolution

self._generic_evaluator(
"position.height.distance",
"height.distance",
"most_recent_position.height.distance",
"Height",
None,
None,
False,
[None, -1000],
value_comparator,
**generic_kwargs,
)

def _evaluate_height_type(self, **generic_kwargs):
"""
Evaluates Height type. Exactly one of sp_observed or dp_observed must be provided.
See as well `common_dictionary_evaluator.md`.

Raises:
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

def value_validator(val: str) -> RIDHeightReference:
return RIDHeightReference(val)

def value_comparator(
v1: Optional[RIDHeightReference], v2: Optional[RIDHeightReference]
) -> bool:

return v1 == v2

self._generic_evaluator(
"position.height.reference",
"height.reference",
"most_recent_position.height.reference",
"Height type",
value_validator,
None,
False,
[
None,
RIDHeightReference.TakeoffLocation,
RIDHeightReference.GroundLevel,
],
value_comparator,
**generic_kwargs,
)

def _evaluate_operator_location(
self,
Expand Down Expand Up @@ -1188,7 +1191,7 @@ def _generic_evaluator(
value_validator: Optional[Callable[[T], T2]],
observed_value_validator: Optional[Callable[[PendingCheck, T2], None]],
injection_required_field: bool,
unknown_value: Optional[T2],
unknown_value: Optional[T2 | List[T2]],
value_comparator: Callable[[Optional[T2], Optional[T2]], bool],
injected: Union[
injection.TestFlight,
Expand Down Expand Up @@ -1217,7 +1220,7 @@ def _generic_evaluator(
value_validator: If not None, pass values through this function. You may raise ValueError to indicate errors.
observed_value_validator: If not None, will be called with check and observed value, for additional verifications
injection_required_field: Boolean to indicate we need to check the case where nothing has been injected (C6)
unknown_value: The default value that needs to be returned when nothing has been injected
unknown_value: The default value that needs to be returned when nothing has been injected. Should multiple ones be accepted, use a list there.
value_comparator: Function that need to return True if both parameters are equal
injected: injected data (flight, telemetry or details).
sp_observed: flight (or details) observed through the SP API.
Expand All @@ -1237,7 +1240,7 @@ def dotted_get(obj: Any, key: str) -> Optional[T]:
if isinstance(val, dict) and k in val:
val = val[k]
else:
val = getattr(val, k)
val = getattr(val, k, None)
return val

injected_val: Optional[T] = dotted_get(injected, injected_field_name)
Expand Down Expand Up @@ -1307,7 +1310,10 @@ def dotted_get(obj: Any, key: str) -> Optional[T]:
f"Invalid {field_human_name} value injected. Injection is marked as required, but we injected a None value. This should have been caught by the injection api."
)

if observed_val != unknown_value: # C6 / C10
if not isinstance(unknown_value, list):
unknown_value = [unknown_value]

if observed_val not in unknown_value: # C6 / C10
check.record_failed(
f"{field_human_name} is inconsistent, expected '{unknown_value}' since no value was injected",
details=f"USS returned the UA type {observed_val} yet no value was injected. Since '{field_human_name}' is a required field of SP API, the SP should map this to '{unknown_value}' and the DP should expose the same value.",
Expand Down
Loading
Loading