4
4
import random
5
5
import unittest
6
6
import time
7
+ import capnp
7
8
from cffi import FFI
8
9
9
10
from cereal import log
10
11
import cereal .messaging as messaging
12
+ from cereal .services import service_list
11
13
from common .params import Params
12
14
13
15
from selfdrive .manager .process_config import managed_processes
@@ -103,13 +105,15 @@ def test_posenet_spike(self):
103
105
ret = self .localizer_get_msg ()
104
106
self .assertFalse (ret .liveLocationKalman .posenetOK )
105
107
108
+
106
109
class TestLocationdProc (unittest .TestCase ):
107
110
MAX_WAITS = 1000
111
+ LLD_MSGS = {'gpsLocationExternal' , 'cameraOdometry' , 'carState' , 'sensorEvents' , 'liveCalibration' }
108
112
109
113
def setUp (self ):
110
114
random .seed (123489234 )
111
115
112
- self .pm = messaging .PubMaster ({ 'gpsLocationExternal' , 'cameraOdometry' } )
116
+ self .pm = messaging .PubMaster (self . LLD_MSGS )
113
117
114
118
managed_processes ['locationd' ].prepare ()
115
119
managed_processes ['locationd' ].start ()
@@ -127,43 +131,53 @@ def send_msg(self, msg):
127
131
waits_left -= 1
128
132
time .sleep (0.0001 )
129
133
130
- def test_params_gps (self ):
131
- # first reset params
132
- Params ().put ('LastGPSPosition' , json .dumps ({"latitude" : 0.0 , "longitude" : 0.0 , "altitude" : 0.0 }))
133
-
134
- lat = 30 + (random .random () * 10.0 )
135
- lon = - 70 + (random .random () * 10.0 )
136
- alt = 5 + (random .random () * 10.0 )
134
+ def get_fake_msg (self , name , t ):
135
+ try :
136
+ msg = messaging .new_message (name )
137
+ except capnp .lib .capnp .KjException :
138
+ msg = messaging .new_message (name , 0 )
137
139
138
- for _ in range (1000 ): # because of kalman filter, send often
139
- msg = messaging .new_message ('gpsLocationExternal' )
140
- msg .logMonoTime = 0
140
+ if name == "gpsLocationExternal" :
141
141
msg .gpsLocationExternal .flags = 1
142
142
msg .gpsLocationExternal .verticalAccuracy = 1.0
143
143
msg .gpsLocationExternal .speedAccuracy = 1.0
144
144
msg .gpsLocationExternal .bearingAccuracyDeg = 1.0
145
145
msg .gpsLocationExternal .vNED = [0.0 , 0.0 , 0.0 ]
146
- msg .gpsLocationExternal .latitude = lat
147
- msg .gpsLocationExternal .longitude = lon
148
- msg .gpsLocationExternal .altitude = alt
149
- self .send_msg (msg )
150
-
151
- for _ in range (250 ): # params is only written so often
152
- msg = messaging .new_message ('cameraOdometry' )
153
- msg .logMonoTime = 0
146
+ msg .gpsLocationExternal .latitude = self .lat
147
+ msg .gpsLocationExternal .longitude = self .lon
148
+ msg .gpsLocationExternal .altitude = self .alt
149
+ elif name == 'cameraOdometry' :
154
150
msg .cameraOdometry .rot = [0.0 , 0.0 , 0.0 ]
155
151
msg .cameraOdometry .rotStd = [0.0 , 0.0 , 0.0 ]
156
152
msg .cameraOdometry .trans = [0.0 , 0.0 , 0.0 ]
157
153
msg .cameraOdometry .transStd = [0.0 , 0.0 , 0.0 ]
158
- self .send_msg (msg )
154
+ msg .logMonoTime = t
155
+ return msg
159
156
157
+ def test_params_gps (self ):
158
+ # first reset params
159
+ Params ().delete ('LastGPSPosition' )
160
+
161
+ self .lat = 30 + (random .random () * 10.0 )
162
+ self .lon = - 70 + (random .random () * 10.0 )
163
+ self .alt = 5 + (random .random () * 10.0 )
164
+ self .fake_duration = 90 # secs
165
+ # get fake messages at the correct frequency, listed in services.py
166
+ fake_msgs = []
167
+ for sec in range (self .fake_duration ):
168
+ for name in self .LLD_MSGS :
169
+ for j in range (int (service_list [name ].frequency )):
170
+ fake_msgs .append (self .get_fake_msg (name , int ((sec + j / service_list [name ].frequency ) * 1e9 )))
171
+
172
+ for fake_msg in sorted (fake_msgs , key = lambda x : x .logMonoTime ):
173
+ self .send_msg (fake_msg )
160
174
time .sleep (1 ) # wait for async params write
161
175
162
176
lastGPS = json .loads (Params ().get ('LastGPSPosition' ))
163
177
164
- self .assertAlmostEqual (lastGPS ['latitude' ], lat , places = 3 )
165
- self .assertAlmostEqual (lastGPS ['longitude' ], lon , places = 3 )
166
- self .assertAlmostEqual (lastGPS ['altitude' ], alt , places = 3 )
178
+ self .assertAlmostEqual (lastGPS ['latitude' ], self . lat , places = 3 )
179
+ self .assertAlmostEqual (lastGPS ['longitude' ], self . lon , places = 3 )
180
+ self .assertAlmostEqual (lastGPS ['altitude' ], self . alt , places = 3 )
167
181
168
182
169
183
if __name__ == "__main__" :
0 commit comments