Skip to content

Commit d35515a

Browse files
committed
add all msgq files, but dont use as default
1 parent a68a38f commit d35515a

10 files changed

+879
-8
lines changed

messaging/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
demo
2+
bridge
23
*.o
34
*.d
45
*.a

messaging/Makefile

+20-7
Original file line numberDiff line numberDiff line change
@@ -31,33 +31,46 @@ endif
3131

3232
CXXFLAGS += $(ZMQ_FLAGS) $(YAML_FLAGS)
3333

34-
OBJS := messaging.o impl_zmq.o
34+
OBJS := messaging.o impl_zmq.o impl_msgq.o msgq.o
3535
DEPS=$(OBJS:.o=.d)
3636

3737
.PRECIOUS: $(OBJS)
3838
.PHONY: all clean
39-
all: messaging.a messaging_pyx.so
39+
all: bridge messaging.a messaging_pyx.so messaging.so
4040

4141
demo: messaging.a demo.o
4242
$(CC) $(LDFLAGS) $^ $(LDLIBS) -L. -l:messaging.a -o '$@'
4343

44+
bridge: messaging.a bridge.o
45+
$(CC) $(LDFLAGS) $^ $(LDLIBS) -L. -l:messaging.a -o '$@'
46+
4447
messaging_pyx.so: messaging.a messaging_pyx_setup.py messaging_pyx.pyx messaging.pxd
4548
python3 messaging_pyx_setup.py build_ext --inplace
4649
rm -rf build
4750
rm -f messaging_pyx.cpp
4851

52+
messaging.so: $(OBJS)
53+
@echo "[ LINK ] $@"
54+
mkdir -p libs_so; \
55+
cd libs_so; \
56+
ar -x $(ZMQ_LIBS); \
57+
ar -x $(YAML_LIB);
58+
59+
$(CXX) -shared $(LDFLAGS) $^ $(LDLIBS) libs_so/*.o -o '$@'
60+
rm -r libs_so
61+
4962
%.a: $(OBJS)
5063
@echo "[ LINK ] $@"
51-
mkdir -p libs; \
52-
cd libs; \
64+
mkdir -p libs_a; \
65+
cd libs_a; \
5366
ar -x $(ZMQ_LIBS); \
5467
ar -x $(YAML_LIB);
5568

56-
ar rcsD '$@' $^ libs/*.o
57-
rm -r libs
69+
ar rcsD '$@' $^ libs_a/*.o
70+
rm -r libs_a
5871

5972
clean:
6073
@echo "[ CLEAN ]"
61-
rm -rf *.so *.a demo libs $(OBJS) $(DEPS)
74+
rm -rf *.so *.a bridge demo libs_a libs_so $(OBJS) $(DEPS)
6275

6376
-include $(DEPS)

messaging/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def __init__(self, services, ignore_alive=None, addr="127.0.0.1"):
155155
def __getitem__(self, s):
156156
return self.data[s]
157157

158-
def update(self, timeout=-1):
158+
def update(self, timeout=1000):
159159
msgs = []
160160
for sock in self.poller.poll(timeout):
161161
msgs.append(recv_one(sock))

messaging/bridge.cc

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#include <iostream>
2+
#include <string>
3+
#include <cassert>
4+
#include <csignal>
5+
#include <map>
6+
7+
#pragma GCC diagnostic push
8+
#pragma GCC diagnostic ignored "-Wshadow"
9+
#include <yaml-cpp/yaml.h>
10+
#pragma GCC diagnostic pop
11+
12+
#include "impl_msgq.hpp"
13+
#include "impl_zmq.hpp"
14+
15+
void sigpipe_handler(int sig) {
16+
assert(sig == SIGPIPE);
17+
std::cout << "SIGPIPE received" << std::endl;
18+
}
19+
20+
static std::vector<std::string> get_services() {
21+
char * base_dir_ptr = std::getenv("BASEDIR");
22+
23+
if (base_dir_ptr == NULL){
24+
base_dir_ptr = std::getenv("PYTHONPATH");
25+
}
26+
27+
assert(base_dir_ptr);
28+
std::string base_dir = base_dir_ptr;
29+
std::string service_list_path = base_dir + "/selfdrive/service_list.yaml";
30+
YAML::Node service_list = YAML::LoadFile(service_list_path);
31+
32+
std::vector<std::string> name_list;
33+
34+
for (const auto& it : service_list) {
35+
auto name = it.first.as<std::string>();
36+
if (name == "plusFrame" || name == "uiLayoutState") continue;
37+
name_list.push_back(name);
38+
}
39+
40+
return name_list;
41+
42+
}
43+
44+
45+
int main(void){
46+
signal(SIGPIPE, (sighandler_t)sigpipe_handler);
47+
48+
auto endpoints = get_services();
49+
50+
std::map<SubSocket*, PubSocket*> sub2pub;
51+
52+
Context *zmq_context = new ZMQContext();
53+
Context *msgq_context = new MSGQContext();
54+
Poller *poller = new MSGQPoller();
55+
56+
for (auto endpoint: endpoints){
57+
SubSocket * msgq_sock = new MSGQSubSocket();
58+
msgq_sock->connect(msgq_context, endpoint, "127.0.0.1", false);
59+
poller->registerSocket(msgq_sock);
60+
61+
PubSocket * zmq_sock = new ZMQPubSocket();
62+
zmq_sock->connect(zmq_context, endpoint);
63+
64+
sub2pub[msgq_sock] = zmq_sock;
65+
}
66+
67+
68+
while (true){
69+
for (auto sub_sock : poller->poll(100)){
70+
Message * msg = sub_sock->receive();
71+
if (msg == NULL) continue;
72+
sub2pub[sub_sock]->sendMessage(msg);
73+
delete msg;
74+
}
75+
}
76+
return 0;
77+
}

messaging/impl_msgq.cc

+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
#include <cassert>
2+
#include <cstring>
3+
#include <iostream>
4+
#include <cstdlib>
5+
#include <csignal>
6+
#include <cerrno>
7+
8+
9+
#include "impl_msgq.hpp"
10+
11+
volatile sig_atomic_t msgq_do_exit = 0;
12+
13+
void sig_handler(int signal) {
14+
assert(signal == SIGINT || signal == SIGTERM);
15+
msgq_do_exit = 1;
16+
}
17+
18+
19+
MSGQContext::MSGQContext() {
20+
}
21+
22+
MSGQContext::~MSGQContext() {
23+
}
24+
25+
void MSGQMessage::init(size_t sz) {
26+
size = sz;
27+
data = new char[size];
28+
}
29+
30+
void MSGQMessage::init(char * d, size_t sz) {
31+
size = sz;
32+
data = new char[size];
33+
memcpy(data, d, size);
34+
}
35+
36+
void MSGQMessage::takeOwnership(char * d, size_t sz) {
37+
size = sz;
38+
data = d;
39+
}
40+
41+
void MSGQMessage::close() {
42+
if (size > 0){
43+
delete[] data;
44+
}
45+
size = 0;
46+
}
47+
48+
MSGQMessage::~MSGQMessage() {
49+
this->close();
50+
}
51+
52+
53+
void MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
54+
assert(context);
55+
assert(address == "127.0.0.1");
56+
57+
q = new msgq_queue_t;
58+
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
59+
msgq_init_subscriber(q);
60+
61+
if (conflate){
62+
q->read_conflate = true;
63+
}
64+
65+
timeout = -1;
66+
67+
std::cout << "MSGQ SUB: " << endpoint << std::endl;
68+
}
69+
70+
71+
Message * MSGQSubSocket::receive(bool non_blocking){
72+
msgq_do_exit = 0;
73+
74+
void (*prev_handler_sigint)(int);
75+
void (*prev_handler_sigterm)(int);
76+
if (!non_blocking){
77+
prev_handler_sigint = std::signal(SIGINT, sig_handler);
78+
prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
79+
}
80+
81+
msgq_msg_t msg;
82+
83+
MSGQMessage *r = NULL;
84+
r = NULL;
85+
86+
int rc = msgq_msg_recv(&msg, q);
87+
88+
// Hack to implement blocking read with a poller. Don't use this
89+
while (!non_blocking && rc == 0 && msgq_do_exit == 0){
90+
msgq_pollitem_t items[1];
91+
items[0].q = q;
92+
93+
int t = (timeout != -1) ? timeout : 100;
94+
95+
msgq_poll(items, 1, t);
96+
rc = msgq_msg_recv(&msg, q);
97+
98+
if (timeout != -1){
99+
break;
100+
}
101+
}
102+
103+
if (rc > 0){
104+
r = new MSGQMessage;
105+
r->takeOwnership(msg.data, msg.size);
106+
}
107+
errno = msgq_do_exit ? EINTR : 0;
108+
109+
if (!non_blocking){
110+
std::signal(SIGINT, prev_handler_sigint);
111+
std::signal(SIGTERM, prev_handler_sigterm);
112+
}
113+
114+
return (Message*)r;
115+
}
116+
117+
void MSGQSubSocket::setTimeout(int t){
118+
timeout = t;
119+
}
120+
121+
MSGQSubSocket::~MSGQSubSocket(){
122+
delete q;
123+
}
124+
125+
void MSGQPubSocket::connect(Context *context, std::string endpoint){
126+
assert(context);
127+
128+
q = new msgq_queue_t;
129+
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
130+
msgq_init_publisher(q);
131+
132+
std::cout << "MSGQ PUB: " << endpoint << std::endl;
133+
}
134+
135+
int MSGQPubSocket::sendMessage(Message *message){
136+
msgq_msg_t msg;
137+
msg.data = message->getData();
138+
msg.size = message->getSize();
139+
140+
return msgq_msg_send(&msg, q);
141+
}
142+
143+
int MSGQPubSocket::send(char *data, size_t size){
144+
msgq_msg_t msg;
145+
msg.data = data;
146+
msg.size = size;
147+
148+
return msgq_msg_send(&msg, q);
149+
}
150+
151+
MSGQPubSocket::~MSGQPubSocket(){
152+
delete q;
153+
}
154+
155+
156+
void MSGQPoller::registerSocket(SubSocket * socket){
157+
assert(num_polls + 1 < MAX_POLLERS);
158+
polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket();
159+
160+
sockets.push_back(socket);
161+
num_polls++;
162+
}
163+
164+
std::vector<SubSocket*> MSGQPoller::poll(int timeout){
165+
std::vector<SubSocket*> r;
166+
167+
msgq_poll(polls, num_polls, timeout);
168+
for (size_t i = 0; i < num_polls; i++){
169+
if (polls[i].revents){
170+
r.push_back(sockets[i]);
171+
}
172+
}
173+
174+
return r;
175+
}

messaging/impl_msgq.hpp

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#pragma once
2+
#include "messaging.hpp"
3+
#include "msgq.hpp"
4+
#include <zmq.h>
5+
#include <string>
6+
7+
#define MAX_POLLERS 128
8+
9+
class MSGQContext : public Context {
10+
private:
11+
void * context = NULL;
12+
public:
13+
MSGQContext();
14+
void * getRawContext() {return context;}
15+
~MSGQContext();
16+
};
17+
18+
class MSGQMessage : public Message {
19+
private:
20+
char * data;
21+
size_t size;
22+
public:
23+
void init(size_t size);
24+
void init(char *data, size_t size);
25+
void takeOwnership(char *data, size_t size);
26+
size_t getSize(){return size;}
27+
char * getData(){return data;}
28+
void close();
29+
~MSGQMessage();
30+
};
31+
32+
class MSGQSubSocket : public SubSocket {
33+
private:
34+
msgq_queue_t * q;
35+
int timeout;
36+
public:
37+
void connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
38+
void setTimeout(int timeout);
39+
void * getRawSocket() {return (void*)q;}
40+
Message *receive(bool non_blocking=false);
41+
~MSGQSubSocket();
42+
};
43+
44+
class MSGQPubSocket : public PubSocket {
45+
private:
46+
msgq_queue_t * q;
47+
public:
48+
void connect(Context *context, std::string endpoint);
49+
int sendMessage(Message *message);
50+
int send(char *data, size_t size);
51+
~MSGQPubSocket();
52+
};
53+
54+
class MSGQPoller : public Poller {
55+
private:
56+
std::vector<SubSocket*> sockets;
57+
msgq_pollitem_t polls[MAX_POLLERS];
58+
size_t num_polls = 0;
59+
60+
public:
61+
void registerSocket(SubSocket *socket);
62+
std::vector<SubSocket*> poll(int timeout);
63+
~MSGQPoller(){};
64+
};

0 commit comments

Comments
 (0)