Skip to content

Commit

Permalink
Messaging unit tests (commaai#66)
Browse files Browse the repository at this point in the history
* pub sub unit test

* pub master test

* first submaster test case

* test init

* submaster conflate test

* more submaster

* all submaster except alive and valid

* fix ZMQ

* fix ZMQ for test_messaging

* single underscore

* fix key error

* more zmq sleep

* zmq needs even more sleep

* unused

* zmq sleep in submaster tests
  • Loading branch information
adeebshihadeh authored Jul 27, 2020
1 parent c176b01 commit 45cd21a
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 0 deletions.
82 changes: 82 additions & 0 deletions messaging/tests/test_messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
import os
import time
import random
import unittest

from cereal import log
import cereal.messaging as messaging
from cereal.services import service_list

events = [evt for evt in log.Event.schema.union_fields if evt in service_list.keys()]

def random_sock():
return random.choice(events)

def random_socks(num_socks=10):
return list(set([random_sock() for _ in range(num_socks)]))

def random_bytes(length=1000):
return bytes([random.randrange(0xFF) for _ in range(length)])

def zmq_sleep(t=1):
if "ZMQ" in os.environ:
time.sleep(t)

# TODO: test both msgq and zmq

class TestPubSubSockets(unittest.TestCase):

def setUp(self):
# ZMQ pub socket takes too long to die
# sleep to prevent multiple publishers error between tests
zmq_sleep()

def test_pub_sub(self):
sock = random_sock()
pub_sock = messaging.pub_sock(sock)
sub_sock = messaging.sub_sock(sock, conflate=False, timeout=None)
zmq_sleep(3)

for _ in range(1000):
msg = random_bytes()
pub_sock.send(msg)
recvd = sub_sock.receive()
self.assertEqual(msg, recvd)

def test_conflate(self):
sock = random_sock()
pub_sock = messaging.pub_sock(sock)
for conflate in [True, False]:
for _ in range(10):
num_msgs = random.randint(3, 10)
sub_sock = messaging.sub_sock(sock, conflate=conflate, timeout=None)
zmq_sleep()

sent_msgs = []
for __ in range(num_msgs):
msg = random_bytes()
pub_sock.send(msg)
sent_msgs.append(msg)
time.sleep(0.1)
recvd_msgs = messaging.drain_sock_raw(sub_sock)
if conflate:
self.assertEqual(len(recvd_msgs), 1)
else:
# TODO: compare actual data
self.assertEqual(len(recvd_msgs), len(sent_msgs))

def test_receive_timeout(self):
sock = random_sock()
for _ in range(10):
timeout = random.randrange(200)
sub_sock = messaging.sub_sock(sock, timeout=timeout)
zmq_sleep()

start_time = time.monotonic()
recvd = sub_sock.receive()
self.assertLess(time.monotonic() - start_time, 0.2)
assert recvd is None

if __name__ == "__main__":
unittest.main()
159 changes: 159 additions & 0 deletions messaging/tests/test_pub_sub_master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#!/usr/bin/env python3
import numbers
import random
import time
import unittest

from cereal import car
import cereal.messaging as messaging
from cereal.messaging.tests.test_messaging import events, random_sock, random_socks, \
random_bytes, zmq_sleep


# TODO: this should take any capnp struct and returrn a msg with random populated data
def random_carstate():
fields = ["vEgo", "aEgo", "gas", "steeringAngle"]
msg = messaging.new_message("carState")
cs = msg.carState
for f in fields:
setattr(cs, f, random.random() * 10)
return msg

# TODO: this should compare any capnp structs
def assert_carstate(cs1, cs2):
for f in car.CarState.schema.non_union_fields:
# TODO: check all types
val1, val2 = getattr(cs1, f), getattr(cs2, f)
if isinstance(val1, numbers.Number):
assert val1 == val2, f"{f}: sent '{val1}' vs recvd '{val2}'"


class TestSubMaster(unittest.TestCase):

def setUp(self):
# ZMQ pub socket takes too long to die
# sleep to prevent multiple publishers error between tests
zmq_sleep(3)

def test_init(self):
sm = messaging.SubMaster(events)
for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive,
sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]:
self.assertEqual(len(p), len(events))

def test_init_state(self):
socks = random_socks()
sm = messaging.SubMaster(socks)
self.assertEqual(sm.frame, -1)
self.assertFalse(any(sm.updated.values()))
self.assertFalse(any(sm.alive.values()))
self.assertTrue(all(t == 0. for t in sm.rcv_time.values()))
self.assertTrue(all(f == 0 for f in sm.rcv_frame.values()))
self.assertTrue(all(t == 0 for t in sm.logMonoTime.values()))

for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive,
sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]:
self.assertEqual(len(p), len(socks))

def test_getitem(self):
sock = "carState"
pub_sock = messaging.pub_sock(sock)
sm = messaging.SubMaster([sock,])
zmq_sleep()

msg = random_carstate()
pub_sock.send(msg.to_bytes())
sm.update(1000)
assert_carstate(msg.carState, sm[sock])

# TODO: break this test up to individually test SubMaster.update and SubMaster.update_msgs
def test_update(self):
sock = "carState"
pub_sock = messaging.pub_sock(sock)
sm = messaging.SubMaster([sock,])
zmq_sleep()

for i in range(10):
msg = messaging.new_message(sock)
pub_sock.send(msg.to_bytes())
sm.update(1000)
self.assertEqual(sm.frame, i)
self.assertTrue(all(sm.updated.values()))

def test_update_timeout(self):
sock = random_sock()
sm = messaging.SubMaster([sock,])
for _ in range(5):
timeout = random.randrange(1000, 5000)
start_time = time.monotonic()
sm.update(timeout)
t = time.monotonic() - start_time
self.assertGreaterEqual(t, timeout/1000.)
self.assertLess(t, 5)
self.assertFalse(any(sm.updated.values()))

def test_alive(self):
pass

def test_ignore_alive(self):
pass

def test_valid(self):
pass

# SubMaster should always conflate
def test_conflate(self):
sock = "carState"
pub_sock = messaging.pub_sock(sock)
sm = messaging.SubMaster([sock,])

n = 10
for i in range(n+1):
msg = messaging.new_message(sock)
msg.carState.vEgo = i
pub_sock.send(msg.to_bytes())
time.sleep(0.01)
sm.update(1000)
self.assertEqual(sm[sock].vEgo, n)


class TestPubMaster(unittest.TestCase):

def setUp(self):
# ZMQ pub socket takes too long to die
# sleep to prevent multiple publishers error between tests
zmq_sleep(3)

def test_init(self):
messaging.PubMaster(events)

def test_send(self):
socks = random_socks()
pm = messaging.PubMaster(socks)
sub_socks = {s: messaging.sub_sock(s, conflate=True, timeout=1000) for s in socks}
zmq_sleep()

# PubMaster accepts either a capnp msg builder or bytes
for capnp in [True, False]:
for i in range(100):
sock = socks[i % len(socks)]

if capnp:
try:
msg = messaging.new_message(sock)
except Exception:
msg = messaging.new_message(sock, random.randrange(50))
else:
msg = random_bytes()

pm.send(sock, msg)
recvd = sub_socks[sock].receive()

if capnp:
msg.clear_write_flag()
msg = msg.to_bytes()
self.assertEqual(msg, recvd, i)


if __name__ == "__main__":
unittest.main()

0 comments on commit 45cd21a

Please sign in to comment.