Skip to content

Commit 9504037

Browse files
author
Vehicle Researcher
committed
Squashed 'cereal/' changes from b8382bb..01942b8
01942b8 add TODO b74a456 don't hardcode the lists ed5a4bf add face stds 396a2bb add can error counter to controlsState c6b5c73 Switch default to msgq (#21) a457ffa Fix indentation in readme.md a1fc8c7 explicitly mention Python for syntax colouring (#20) 19e2393 Fix expected for cameraOdometry and liveCalibration e7d2f97 Add radar comm issue error db64cd4 Reserve safety #21 for VAG PQ35/PQ46/NMS (#19) 79d638d separate honda safety models between Bosch Giraffe and Bosch Nidec 2614a65 better name b6b84cd add longitudinal 78f5934 Add canRxErrs to health 6758899 qlog liveCalibration df80b87 add more stuff to fw log in CarParams a87805a fix doxs 4746b20 got doxed 21cf3f5 build on mac 31ac47c Add carUnrecognized event git-subtree-dir: cereal git-subtree-split: 01942b8
1 parent e3b2117 commit 9504037

11 files changed

+122
-40
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ libmessaging.*
1010
libmessaging_shared.*
1111
services.h
1212
.sconsign.dblite
13-
libcereal_shared.so
13+
libcereal_shared.*
1414

README.md

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
What is cereal?
2+
----
3+
4+
cereal is both a messaging spec for robotics systems as well as generic high performance IPC pub sub messaging with a single publisher and multiple subscribers.
5+
6+
Imagine this use case:
7+
* A sensor process reads gyro measurements directly from an IMU and publishes a sensorEvents packet
8+
* A calibration process subscribes to the sensorEvents packet to use the IMU
9+
* A localization process subscribes to the sensorEvents packet to use the IMU also
10+
11+
12+
Messaging Spec
13+
----
14+
15+
You'll find the message types in [log.capnp](log.capnp). It uses [Cap'n proto](https://capnproto.org/capnp-tool.html) and defines one struct called Event.
16+
17+
All Events have a logMonoTime and a valid. Then a big union defines the packet type.
18+
19+
20+
Pub Sub Backends
21+
----
22+
23+
cereal supports two backends, one based on [zmq](https://zeromq.org/), the other called msgq, a custom pub sub based on shared memory that doesn't require the bytes to pass through the kernel.
24+
25+
Example
26+
---
27+
```python
28+
import cereal.messaging as messaging
29+
30+
# in subscriber
31+
sm = messaging.SubMaster(['sensorEvents'])
32+
while 1:
33+
sm.update()
34+
print(sm['sensorEvents'])
35+
36+
# in publisher
37+
pm = messaging.PubMaster(['sensorEvents'])
38+
dat = messaging.new_message()
39+
dat.init('sensorEvents', 1)
40+
dat.sensorEvents[0] = {"gyro": {"v": [0.1, -0.1, 0.1]}}
41+
pm.send('sensorEvents', dat)
42+
```

SConscript

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ cereal_objects = env.SharedObject([
2929
])
3030

3131
env.Library('cereal', cereal_objects)
32-
env.SharedLibrary('cereal_shared', cereal_objects)
32+
env.SharedLibrary('cereal_shared', cereal_objects, LIBS=["capnp_c"])
3333

3434
cereal_dir = Dir('.')
3535
services_h = env.Command(
@@ -49,7 +49,7 @@ Depends('messaging/impl_zmq.cc', services_h)
4949

5050
# note, this rebuilds the deps shared, zmq is statically linked to make APK happy
5151
# TODO: get APK to load system zmq to remove the static link
52-
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else []
52+
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [zmq]
5353
env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib)
5454

5555
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq'])

car.capnp

+16-4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ struct CarEvent @0x9b1657f34caf3ad3 {
8888
lowMemory @63;
8989
stockAeb @64;
9090
ldw @65;
91+
carUnrecognized @66;
92+
radarCommIssue @67;
9193
}
9294
}
9395

@@ -410,11 +412,11 @@ struct CarParams {
410412

411413
enum SafetyModel {
412414
silent @0;
413-
honda @1;
415+
hondaNidec @1;
414416
toyota @2;
415417
elm327 @3;
416418
gm @4;
417-
hondaBosch @5;
419+
hondaBoschGiraffe @5;
418420
ford @6;
419421
cadillac @7;
420422
hyundai @8;
@@ -428,7 +430,9 @@ struct CarParams {
428430
toyotaIpas @16;
429431
allOutput @17;
430432
gmAscm @18;
431-
noOutput @19; # like silent but with silent CAN TXs
433+
noOutput @19; # like silent but without silent CAN TXs
434+
hondaBoschHarness @20;
435+
volkswagenPq @21;
432436
}
433437

434438
enum SteerControlType {
@@ -444,13 +448,21 @@ struct CarParams {
444448

445449
struct CarFw {
446450
ecu @0 :Ecu;
447-
fwVersion @1 :Text;
451+
fwVersion @1 :Data;
452+
address @2: UInt32;
453+
subAddress @3: UInt8;
448454
}
449455

450456
enum Ecu {
451457
eps @0;
452458
esp @1;
453459
fwdRadar @2;
454460
fwdCamera @3;
461+
engine @4;
462+
unknown @5;
463+
464+
# Toyota only
465+
dsu @6;
466+
apgs @7;
455467
}
456468
}

log.capnp

+11
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ struct HealthData {
310310
hasGps @6 :Bool;
311311
canSendErrs @7 :UInt32;
312312
canFwdErrs @8 :UInt32;
313+
canRxErrs @19 :UInt32;
313314
gmlanSendErrs @9 :UInt32;
314315
hwType @10 :HwType;
315316
fanSpeedRpm @11 :UInt16;
@@ -484,6 +485,7 @@ struct ControlsState @0x97ff69c53601abf1 {
484485
decelForTurn @47 :Bool;
485486

486487
decelForModel @54 :Bool;
488+
canErrorCounter @57 :UInt32;
487489

488490
lateralControlState :union {
489491
indiState @52 :LateralINDIState;
@@ -575,6 +577,7 @@ struct ModelData {
575577
leadFuture @7 :LeadData;
576578
speed @8 :List(Float32);
577579
meta @10 :MetaData;
580+
longitudinal @11 :LongitudinalData;
578581

579582
struct PathData {
580583
points @0 :List(Float32);
@@ -605,13 +608,19 @@ struct ModelData {
605608
yuvCorrection @5 :List(Float32);
606609
inputTransform @6 :List(Float32);
607610
}
611+
608612
struct MetaData {
609613
engagedProb @0 :Float32;
610614
desirePrediction @1 :List(Float32);
611615
brakeDisengageProb @2 :Float32;
612616
gasDisengageProb @3 :Float32;
613617
steerOverrideProb @4 :Float32;
614618
}
619+
620+
struct LongitudinalData {
621+
speeds @0 :List(Float32);
622+
accelerations @1 :List(Float32);
623+
}
615624
}
616625

617626
struct CalibrationFeatures {
@@ -1757,6 +1766,8 @@ struct DriverMonitoring {
17571766
leftBlinkProb @8 :Float32;
17581767
rightBlinkProb @9 :Float32;
17591768
irPwrDEPRECATED @10 :Float32;
1769+
faceOrientationStd @11 :List(Float32);
1770+
facePositionStd @12 :List(Float32);
17601771
}
17611772

17621773
struct Boot {

messaging/__init__.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# must be build with scons
22
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
33
from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error
4+
import capnp
45

56
assert MultiplePublishersError
67
assert MessagingError
@@ -116,6 +117,7 @@ def recv_one_retry(sock):
116117
if dat is not None:
117118
return log.Event.from_bytes(dat)
118119

120+
# TODO: This does not belong in messaging
119121
def get_one_can(logcan):
120122
while True:
121123
can = recv_one_retry(logcan)
@@ -147,12 +149,12 @@ def __init__(self, services, ignore_alive=None, addr="127.0.0.1"):
147149
self.freq[s] = service_list[s].frequency
148150

149151
data = new_message()
150-
if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan',
151-
'ethernetData', 'cellInfo', 'wifiScan',
152-
'trafficEvents', 'orbObservation', 'carEvents']:
153-
data.init(s, 0)
154-
else:
152+
try:
155153
data.init(s)
154+
except capnp.lib.capnp.KjException:
155+
# lists
156+
data.init(s, 0)
157+
156158
self.data[s] = getattr(data, s)
157159
self.logMonoTime[s] = 0
158160
self.valid[s] = data.valid

messaging/bridge.cc

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include <csignal>
55
#include <map>
66

7+
typedef void (*sighandler_t)(int sig);
8+
79
#include "services.h"
810

911
#include "impl_msgq.hpp"

messaging/impl_msgq.cc

+11-6
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
8585
msgq_msg_t msg;
8686

8787
MSGQMessage *r = NULL;
88-
r = NULL;
8988

9089
int rc = msgq_msg_recv(&msg, q);
9190

@@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){
109108
}
110109
}
111110

112-
if (rc > 0){
113-
r = new MSGQMessage;
114-
r->takeOwnership(msg.data, msg.size);
115-
}
116-
errno = msgq_do_exit ? EINTR : 0;
117111

118112
if (!non_blocking){
119113
std::signal(SIGINT, prev_handler_sigint);
120114
std::signal(SIGTERM, prev_handler_sigterm);
121115
}
122116

117+
errno = msgq_do_exit ? EINTR : 0;
118+
119+
if (rc > 0){
120+
if (msgq_do_exit){
121+
msgq_msg_close(&msg); // Free unused message on exit
122+
} else {
123+
r = new MSGQMessage;
124+
r->takeOwnership(msg.data, msg.size);
125+
}
126+
}
127+
123128
return (Message*)r;
124129
}
125130

messaging/messaging.cc

+12-12
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@
44

55
Context * Context::create(){
66
Context * c;
7-
if (std::getenv("MSGQ")){
8-
c = new MSGQContext();
9-
} else {
7+
if (std::getenv("ZMQ")){
108
c = new ZMQContext();
9+
} else {
10+
c = new MSGQContext();
1111
}
1212
return c;
1313
}
1414

1515
SubSocket * SubSocket::create(){
1616
SubSocket * s;
17-
if (std::getenv("MSGQ")){
18-
s = new MSGQSubSocket();
19-
} else {
17+
if (std::getenv("ZMQ")){
2018
s = new ZMQSubSocket();
19+
} else {
20+
s = new MSGQSubSocket();
2121
}
2222
return s;
2323
}
@@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri
6060

6161
PubSocket * PubSocket::create(){
6262
PubSocket * s;
63-
if (std::getenv("MSGQ")){
64-
s = new MSGQPubSocket();
65-
} else {
63+
if (std::getenv("ZMQ")){
6664
s = new ZMQPubSocket();
65+
} else {
66+
s = new MSGQPubSocket();
6767
}
6868
return s;
6969
}
@@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){
8282

8383
Poller * Poller::create(){
8484
Poller * p;
85-
if (std::getenv("MSGQ")){
86-
p = new MSGQPoller();
87-
} else {
85+
if (std::getenv("ZMQ")){
8886
p = new ZMQPoller();
87+
} else {
88+
p = new MSGQPoller();
8989
}
9090
return p;
9191
}

messaging/msgq.cc

+16-8
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
#include "msgq.hpp"
2525

26-
void sigusr1_handler(int signal) {
27-
assert(signal == SIGUSR1);
26+
void sigusr2_handler(int signal) {
27+
assert(signal == SIGUSR2);
2828
}
2929

3030
uint64_t msgq_get_uid(void){
@@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
8080
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
8181
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
8282

83-
std::signal(SIGUSR1, sigusr1_handler);
83+
std::signal(SIGUSR2, sigusr2_handler);
8484

8585
const char * prefix = "/dev/shm/";
8686
char * full_path = new char[strlen(path) + strlen(prefix) + 1];
@@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){
136136

137137

138138
void msgq_init_publisher(msgq_queue_t * q) {
139-
std::cout << "Starting publisher" << std::endl;
139+
//std::cout << "Starting publisher" << std::endl;
140140
uint64_t uid = msgq_get_uid();
141141

142142
*q->write_uid = uid;
@@ -150,6 +150,15 @@ void msgq_init_publisher(msgq_queue_t * q) {
150150
q->write_uid_local = uid;
151151
}
152152

153+
static void thread_signal(uint32_t tid) {
154+
#ifndef SYS_tkill
155+
// TODO: this won't work for multithreaded programs
156+
kill(tid, SIGUSR2);
157+
#else
158+
syscall(SYS_tkill, tid, SIGUSR2);
159+
#endif
160+
}
161+
153162
void msgq_init_subscriber(msgq_queue_t * q) {
154163
assert(q != NULL);
155164
assert(q->num_readers != NULL);
@@ -173,7 +182,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
173182
*q->read_uids[i] = 0;
174183

175184
// Wake up reader in case they are in a poll
176-
syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1);
185+
thread_signal(old_uid & 0xFFFFFFFF);
177186
}
178187

179188
continue;
@@ -196,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
196205
}
197206
}
198207

199-
std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
208+
//std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
200209
msgq_reset_reader(q);
201210
}
202211

@@ -278,8 +287,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
278287
// Notify readers
279288
for (uint64_t i = 0; i < num_readers; i++){
280289
uint64_t reader_uid = *q->read_uids[i];
281-
282-
syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1);
290+
thread_signal(reader_uid & 0xFFFFFFFF);
283291
}
284292

285293
return msg->size;

0 commit comments

Comments
 (0)