Skip to content

Commit

Permalink
[uss_qualifier/common_dict_eval] Move height and height type to gener…
Browse files Browse the repository at this point in the history
…ic function (#958)
  • Loading branch information
mickmis authored Feb 12, 2025
1 parent 3c406f5 commit 61c1a95
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 204 deletions.
31 changes: 28 additions & 3 deletions monitoring/monitorlib/fetch/rid.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,16 @@ class Position(ImplicitDict):

@staticmethod
def from_v19_rid_aircraft_position(
p: v19.api.RIDAircraftPosition, t: v19.api.StringBasedDateTime
p: v19.api.RIDAircraftPosition,
t: v19.api.StringBasedDateTime,
h: Optional[v19.api.RIDHeight],
) -> Position:
return Position(
lat=p.lat,
lng=p.lng,
alt=p.alt,
time=t.datetime,
height=None,
height=h,
accuracy_v=p.accuracy_v if "accuracy_v" in p else None,
accuracy_h=p.accuracy_h if "accuracy_h" in p else None,
)
Expand Down Expand Up @@ -237,6 +239,7 @@ def most_recent_position(
return Position.from_v19_rid_aircraft_position(
self.v19_value.current_state.position,
self.v19_value.current_state.timestamp,
self.height,
)
elif self.rid_version == RIDVersion.f3411_22a:
return Position.from_v22a_rid_aircraft_position(
Expand All @@ -254,7 +257,7 @@ def most_recent_position(
def recent_positions(self) -> List[Position]:
if self.rid_version == RIDVersion.f3411_19:
return [
Position.from_v19_rid_aircraft_position(p.position, p.time)
Position.from_v19_rid_aircraft_position(p.position, p.time, self.height)
for p in self.v19_value.recent_positions
]
elif self.rid_version == RIDVersion.f3411_22a:
Expand Down Expand Up @@ -407,6 +410,28 @@ def aircraft_type(
f"Cannot retrieve aircraft_type using RID version {self.rid_version}"
)

@property
def height(
self,
) -> Optional[Union[v19.api.RIDHeight, v22a.api.RIDHeight]]:
if self.rid_version == RIDVersion.f3411_19:
if not self.v19_value.has_field_with_value(
"current_state"
) or not self.v19_value.current_state.has_field_with_value("height"):
return None
return self.v19_value.current_state.height
elif self.rid_version == RIDVersion.f3411_22a:
if (
not self.most_recent_position
or not self.most_recent_position.has_field_with_value("height")
):
return None
return self.most_recent_position.height
else:
raise NotImplementedError(
f"Cannot retrieve aircraft_type using RID version {self.rid_version}"
)

def errors(self) -> List[str]:
try:
rid_version = self.rid_version
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import datetime
import math
from collections.abc import Callable
Expand All @@ -14,6 +16,7 @@
from uas_standards.astm.f3411.v22a.api import (
UASID,
HorizontalAccuracy,
RIDHeightReference,
RIDOperationalStatus,
SpeedAccuracy,
Time,
Expand Down Expand Up @@ -64,6 +67,8 @@ class RIDCommonDictionaryEvaluator(object):
"_evaluate_vertical_speed",
"_evaluate_speed",
"_evaluate_track",
"_evaluate_height",
"_evaluate_height_type",
]
details_evaluators = [
"_evaluate_ua_classification",
Expand Down Expand Up @@ -144,11 +149,6 @@ def evaluate_dp_flight(
observed_flight.most_recent_position,
participants,
)
self._evaluate_height(
injected_telemetry.get("height"),
observed_flight.most_recent_position.get("height"),
participants,
)

def evaluate_sp_details(
self,
Expand Down Expand Up @@ -536,66 +536,69 @@ def _evaluate_position(
message=f"Unsupported version {self._rid_version}: skipping position evaluation",
)

def _evaluate_height(
self,
height_inj: Optional[injection.RIDHeight],
height_obs: Optional[observation_api.RIDHeight],
participants: List[str],
):
if self._rid_version == RIDVersion.f3411_22a:
if height_obs is not None:
with self._test_scenario.check(
"Height consistency with Common Dictionary", participants
) as check:
if (
height_obs.reference
!= observation_api.RIDHeightReference.TakeoffLocation
and height_obs.reference
!= observation_api.RIDHeightReference.GroundLevel
):
check.record_failed(
f"Invalid height type: {height_obs.reference}",
details=f"The height type reference shall be either {observation_api.RIDHeightReference.TakeoffLocation} or {observation_api.RIDHeightReference.GroundLevel}",
)
def _evaluate_height(self, **generic_kwargs):
"""
Evaluates Height. Exactly one of sp_observed or dp_observed must be provided.
See as well `common_dictionary_evaluator.md`.
with self._test_scenario.check(
"Height is consistent with injected one", participants
) as check:
if not math.isclose(
height_obs.distance, height_inj.distance, abs_tol=1.0
):
check.record_failed(
"Observed Height is inconsistent with injected one",
details=f"Observed height: {height_obs} - injected: {height_inj}",
)
Raises:
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

with self._test_scenario.check(
"Height Type consistency with Common Dictionary", participants
) as check:
if (
height_obs.reference
!= observation_api.RIDHeightReference.TakeoffLocation
and height_obs.reference
!= observation_api.RIDHeightReference.GroundLevel
):
check.record_failed(
f"Invalid height type: {height_obs.reference}",
details=f"The height type reference shall be either {observation_api.RIDHeightReference.TakeoffLocation} or {observation_api.RIDHeightReference.GroundLevel}",
)
def value_comparator(v1: Optional[float], v2: Optional[float]) -> bool:

with self._test_scenario.check(
"Height Type is consistent with injected one", participants
) as check:
if height_obs.reference != height_inj.reference:
check.record_failed(
"Observed Height type is inconsistent with injected one",
details=f"Observed height: {height_obs} - injected: {height_inj}",
)
else:
self._test_scenario.record_note(
key="skip_reason",
message=f"Unsupported version {self._rid_version}: skipping Height and Height Type evaluation",
)
if v1 is None or v2 is None:
return False

return abs(v1 - v2) < constants.MinHeightResolution

self._generic_evaluator(
"position.height.distance",
"height.distance",
"most_recent_position.height.distance",
"Height",
None,
None,
False,
[None, -1000],
value_comparator,
**generic_kwargs,
)

def _evaluate_height_type(self, **generic_kwargs):
"""
Evaluates Height type. Exactly one of sp_observed or dp_observed must be provided.
See as well `common_dictionary_evaluator.md`.
Raises:
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

def value_validator(val: str) -> RIDHeightReference:
return RIDHeightReference(val)

def value_comparator(
v1: Optional[RIDHeightReference], v2: Optional[RIDHeightReference]
) -> bool:

return v1 == v2

self._generic_evaluator(
"position.height.reference",
"height.reference",
"most_recent_position.height.reference",
"Height type",
value_validator,
None,
False,
[
None,
RIDHeightReference.TakeoffLocation,
RIDHeightReference.GroundLevel,
],
value_comparator,
**generic_kwargs,
)

def _evaluate_operator_location(
self,
Expand Down Expand Up @@ -1188,7 +1191,7 @@ def _generic_evaluator(
value_validator: Optional[Callable[[T], T2]],
observed_value_validator: Optional[Callable[[PendingCheck, T2], None]],
injection_required_field: bool,
unknown_value: Optional[T2],
unknown_value: Optional[T2 | List[T2]],
value_comparator: Callable[[Optional[T2], Optional[T2]], bool],
injected: Union[
injection.TestFlight,
Expand Down Expand Up @@ -1217,7 +1220,7 @@ def _generic_evaluator(
value_validator: If not None, pass values through this function. You may raise ValueError to indicate errors.
observed_value_validator: If not None, will be called with check and observed value, for additional verifications
injection_required_field: Boolean to indicate we need to check the case where nothing has been injected (C6)
unknown_value: The default value that needs to be returned when nothing has been injected
unknown_value: The default value that needs to be returned when nothing has been injected. Should multiple ones be accepted, use a list there.
value_comparator: Function that need to return True if both parameters are equal
injected: injected data (flight, telemetry or details).
sp_observed: flight (or details) observed through the SP API.
Expand All @@ -1237,7 +1240,7 @@ def dotted_get(obj: Any, key: str) -> Optional[T]:
if isinstance(val, dict) and k in val:
val = val[k]
else:
val = getattr(val, k)
val = getattr(val, k, None)
return val

injected_val: Optional[T] = dotted_get(injected, injected_field_name)
Expand Down Expand Up @@ -1307,7 +1310,10 @@ def dotted_get(obj: Any, key: str) -> Optional[T]:
f"Invalid {field_human_name} value injected. Injection is marked as required, but we injected a None value. This should have been caught by the injection api."
)

if observed_val != unknown_value: # C6 / C10
if not isinstance(unknown_value, list):
unknown_value = [unknown_value]

if observed_val not in unknown_value: # C6 / C10
check.record_failed(
f"{field_human_name} is inconsistent, expected '{unknown_value}' since no value was injected",
details=f"USS returned the UA type {observed_val} yet no value was injected. Since '{field_human_name}' is a required field of SP API, the SP should map this to '{unknown_value}' and the DP should expose the same value.",
Expand Down
Loading

0 comments on commit 61c1a95

Please sign in to comment.