Skip to content
This repository was archived by the owner on Apr 13, 2024. It is now read-only.

Commit c6b5c73

Browse files
authored
Switch default to msgq (#21)
* switch default to msgq * SIGUSR1 is already used by the apks * Don't return message upstream when exiting * Remove debug print * Remove more debug print
1 parent a457ffa commit c6b5c73

File tree

3 files changed

+30
-25
lines changed

3 files changed

+30
-25
lines changed

messaging/impl_msgq.cc

+11-6
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
8585
msgq_msg_t msg;
8686

8787
MSGQMessage *r = NULL;
88-
r = NULL;
8988

9089
int rc = msgq_msg_recv(&msg, q);
9190

@@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){
109108
}
110109
}
111110

112-
if (rc > 0){
113-
r = new MSGQMessage;
114-
r->takeOwnership(msg.data, msg.size);
115-
}
116-
errno = msgq_do_exit ? EINTR : 0;
117111

118112
if (!non_blocking){
119113
std::signal(SIGINT, prev_handler_sigint);
120114
std::signal(SIGTERM, prev_handler_sigterm);
121115
}
122116

117+
errno = msgq_do_exit ? EINTR : 0;
118+
119+
if (rc > 0){
120+
if (msgq_do_exit){
121+
msgq_msg_close(&msg); // Free unused message on exit
122+
} else {
123+
r = new MSGQMessage;
124+
r->takeOwnership(msg.data, msg.size);
125+
}
126+
}
127+
123128
return (Message*)r;
124129
}
125130

messaging/messaging.cc

+12-12
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@
44

55
Context * Context::create(){
66
Context * c;
7-
if (std::getenv("MSGQ")){
8-
c = new MSGQContext();
9-
} else {
7+
if (std::getenv("ZMQ")){
108
c = new ZMQContext();
9+
} else {
10+
c = new MSGQContext();
1111
}
1212
return c;
1313
}
1414

1515
SubSocket * SubSocket::create(){
1616
SubSocket * s;
17-
if (std::getenv("MSGQ")){
18-
s = new MSGQSubSocket();
19-
} else {
17+
if (std::getenv("ZMQ")){
2018
s = new ZMQSubSocket();
19+
} else {
20+
s = new MSGQSubSocket();
2121
}
2222
return s;
2323
}
@@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri
6060

6161
PubSocket * PubSocket::create(){
6262
PubSocket * s;
63-
if (std::getenv("MSGQ")){
64-
s = new MSGQPubSocket();
65-
} else {
63+
if (std::getenv("ZMQ")){
6664
s = new ZMQPubSocket();
65+
} else {
66+
s = new MSGQPubSocket();
6767
}
6868
return s;
6969
}
@@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){
8282

8383
Poller * Poller::create(){
8484
Poller * p;
85-
if (std::getenv("MSGQ")){
86-
p = new MSGQPoller();
87-
} else {
85+
if (std::getenv("ZMQ")){
8886
p = new ZMQPoller();
87+
} else {
88+
p = new MSGQPoller();
8989
}
9090
return p;
9191
}

messaging/msgq.cc

+7-7
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
#include "msgq.hpp"
2525

26-
void sigusr1_handler(int signal) {
27-
assert(signal == SIGUSR1);
26+
void sigusr2_handler(int signal) {
27+
assert(signal == SIGUSR2);
2828
}
2929

3030
uint64_t msgq_get_uid(void){
@@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
8080
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
8181
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
8282

83-
std::signal(SIGUSR1, sigusr1_handler);
83+
std::signal(SIGUSR2, sigusr2_handler);
8484

8585
const char * prefix = "/dev/shm/";
8686
char * full_path = new char[strlen(path) + strlen(prefix) + 1];
@@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){
136136

137137

138138
void msgq_init_publisher(msgq_queue_t * q) {
139-
std::cout << "Starting publisher" << std::endl;
139+
//std::cout << "Starting publisher" << std::endl;
140140
uint64_t uid = msgq_get_uid();
141141

142142
*q->write_uid = uid;
@@ -153,9 +153,9 @@ void msgq_init_publisher(msgq_queue_t * q) {
153153
static void thread_signal(uint32_t tid) {
154154
#ifndef SYS_tkill
155155
// TODO: this won't work for multithreaded programs
156-
kill(tid, SIGUSR1);
156+
kill(tid, SIGUSR2);
157157
#else
158-
syscall(SYS_tkill, tid, SIGUSR1);
158+
syscall(SYS_tkill, tid, SIGUSR2);
159159
#endif
160160
}
161161

@@ -205,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
205205
}
206206
}
207207

208-
std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
208+
//std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
209209
msgq_reset_reader(q);
210210
}
211211

0 commit comments

Comments
 (0)