Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_locationd: don't redownload logs #29929

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions selfdrive/locationd/test/test_locationd_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ def get_nested_keys(msg, keys):
return data


def run_scenarios(scenario):
logs = list(LogReader(get_url(TEST_ROUTE, TEST_SEG_NUM)))
def run_scenarios(scenario, logs):
if scenario == Scenario.BASE:
pass

Expand Down Expand Up @@ -104,14 +103,19 @@ class TestLocationdScenarios(unittest.TestCase):
- locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK)
- faulty values should be ignored, with appropriate flags set
"""

@classmethod
def setUpClass(cls):
cls.logs = list(LogReader(get_url(TEST_ROUTE, TEST_SEG_NUM)))

def test_base(self):
"""
Test: unchanged log
Expected Result:
- yaw_rate: unchanged
- roll: unchanged
"""
orig_data, replayed_data = run_scenarios(Scenario.BASE)
orig_data, replayed_data = run_scenarios(Scenario.BASE, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))

Expand All @@ -123,7 +127,7 @@ def test_gps_off(self):
- roll:
- gpsOK: False
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF)
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.all(replayed_data['gps_flag'] == 0.0))
Expand All @@ -136,7 +140,7 @@ def test_gps_off_midway(self):
- roll:
- gpsOK: True for the first half, False for the second half
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY)
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.diff(replayed_data['gps_flag'])[512] == -1.0)
Expand All @@ -149,7 +153,7 @@ def test_gps_on_midway(self):
- roll:
- gpsOK: False for the first half, True for the second half
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY)
orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(1.5)))
self.assertTrue(np.diff(replayed_data['gps_flag'])[505] == 1.0)
Expand All @@ -162,7 +166,7 @@ def test_gps_tunnel(self):
- roll:
- gpsOK: False for the middle section, True for the rest
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL)
orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.diff(replayed_data['gps_flag'])[213] == -1.0)
Expand All @@ -176,7 +180,7 @@ def test_gyro_off(self):
- roll: 0
- sensorsOK: False
"""
_, replayed_data = run_scenarios(Scenario.GYRO_OFF)
_, replayed_data = run_scenarios(Scenario.GYRO_OFF, self.logs)
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0))
self.assertTrue(np.allclose(replayed_data['roll'], 0.0))
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0))
Expand All @@ -189,7 +193,7 @@ def test_gyro_spikes(self):
- roll: unchanged
- inputsOK: False for some time after the spike, True for the rest
"""
orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY)
orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.diff(replayed_data['inputs_flag'])[500] == -1.0)
Expand All @@ -203,7 +207,7 @@ def test_accel_off(self):
- roll: 0
- sensorsOK: False
"""
_, replayed_data = run_scenarios(Scenario.ACCEL_OFF)
_, replayed_data = run_scenarios(Scenario.ACCEL_OFF, self.logs)
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0))
self.assertTrue(np.allclose(replayed_data['roll'], 0.0))
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0))
Expand All @@ -214,7 +218,7 @@ def test_accel_spikes(self):
Test: an accelerometer spike in the middle of the segment
Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes.
"""
orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY)
orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY, self.logs)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))

Expand Down