Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Laikad: Use process for parsing orbits #24769

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions selfdrive/locationd/laikad.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
import threading
import time
from typing import List
from multiprocessing import Process, Queue
from typing import List, Optional

import numpy as np
from collections import defaultdict
Expand All @@ -27,14 +27,17 @@ class Laikad:
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.latest_time_msg = None
self.orbit_p: Optional[Process] = None
self.orbit_q = Queue()

def process_ublox_msg(self, ublox_msg, ublox_mono_time: int):
if ublox_msg.which == 'measurementReport':
report = ublox_msg.measurementReport
new_meas = read_raw_ublox(report)
if report.gpsWeek > 0:
self.latest_time_msg = GPSTime(report.gpsWeek, report.rcvTow)
latest_msg_t = GPSTime(report.gpsWeek, report.rcvTow)
self.fetch_orbits(latest_msg_t + SECS_IN_MIN, block=False)
new_meas = read_raw_ublox(report)

measurements = process_measurements(new_meas, self.astro_dog)
pos_fix = calc_pos_fix(measurements, min_measurements=4)
# To get a position fix a minimum of 5 measurements are needed.
Expand Down Expand Up @@ -104,18 +107,28 @@ def init_gnss_localizer(self, est_pos):

self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)

def orbit_thread(self, end_event: threading.Event):
while not end_event.is_set():
if self.latest_time_msg:
self.fetch_orbits(self.latest_time_msg + SECS_IN_MIN)
time.sleep(0.1)
def get_orbit_data(self, t: GPSTime, queue):
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
start_time = time.monotonic()
self.astro_dog.get_orbit_data(t, only_predictions=True)
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s")
queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times))

def fetch_orbits(self, t: GPSTime):
def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times:
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
start_time = time.monotonic()
self.astro_dog.get_orbit_data(t, only_predictions=True)
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s")
if self.orbit_p is None:
self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q))
self.orbit_p.start()
ret = None
if block:
ret = self.orbit_q.get(block=True)
elif not self.orbit_q.empty():
ret = self.orbit_q.get()

if ret:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.orbit_p.join()
self.orbit_p = None


def create_measurement_msg(meas: GNSSMeasurement):
Expand Down Expand Up @@ -162,21 +175,14 @@ def main():
pm = messaging.PubMaster(['gnssMeasurements'])

laikad = Laikad()

end_event = threading.Event()
threading.Thread(target=laikad.orbit_thread, args=(end_event,)).start()
try:
while not end_event.is_set():
sm.update()

if sm.updated['ubloxGnss']:
ublox_msg = sm['ubloxGnss']
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'])
if msg is not None:
pm.send('gnssMeasurements', msg)
except (KeyboardInterrupt, SystemExit):
end_event.set()
raise
while True:
sm.update()

if sm.updated['ubloxGnss']:
ublox_msg = sm['ubloxGnss']
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'])
if msg is not None:
pm.send('gnssMeasurements', msg)


if __name__ == "__main__":
Expand Down
10 changes: 3 additions & 7 deletions selfdrive/locationd/test/test_laikad.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,14 @@ def test_laika_get_orbits(self):
if len(new_meas) != 0:
first_gps_time = new_meas[0].recv_time
break
# Pretend thread has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_orbits(first_gps_time)
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_orbits(first_gps_time, block=True)
self.assertEqual(29, len(laikad.astro_dog.orbits.keys()))
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))

@unittest.skip("Use to debug live data")
def test_laika_get_orbits_now(self):
laikad = Laikad(auto_update=False)
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()))
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True)
prn = "G01"
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
prn = "R01"
Expand Down