Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor CANParser to improve performance #795

Merged
merged 30 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ jobs:
unit-tests:
name: unit tests
runs-on: ubuntu-20.04
#strategy:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deanlee in the future, you can uncomment this to run it several times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

# fail-fast: false
# matrix:
# run: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
steps:
- uses: actions/checkout@v3
- name: Build Docker image
Expand Down
3 changes: 2 additions & 1 deletion can/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ class CANParser {
CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter);
#ifndef DYNAMIC_CAPNP
void update_string(const std::string &data, bool sendcan);
void update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan);
void UpdateCans(uint64_t sec, const capnp::List<cereal::CanData>::Reader& cans);
#endif
void UpdateCans(uint64_t sec, const capnp::DynamicStruct::Reader& cans);
void UpdateValid(uint64_t sec);
std::vector<SignalValue> query_latest();
void query_latest(std::vector<SignalValue> &vals, uint64_t last_ts = 0);
};

class CANPacker {
Expand Down
3 changes: 1 addition & 2 deletions can/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ cdef extern from "common.h":
bool can_valid
bool bus_timeout
CANParser(int, string, vector[MessageParseOptions], vector[SignalParseOptions])
void update_string(string&, bool)
vector[SignalValue] query_latest()
void update_strings(vector[string]&, vector[SignalValue]&, bool)

cdef cppclass CANPacker:
CANPacker(string)
Expand Down
31 changes: 21 additions & 10 deletions can/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int64_t get_raw_value(const std::vector<uint8_t> &msg, const Signal &sig) {

bool MessageState::parse(uint64_t sec, const std::vector<uint8_t> &dat) {
for (int i = 0; i < parse_sigs.size(); i++) {
auto &sig = parse_sigs[i];
const auto &sig = parse_sigs[i];

int64_t tmp = get_raw_value(dat, sig);
if (sig.is_signed) {
Expand Down Expand Up @@ -204,14 +204,24 @@ void CANParser::update_string(const std::string &data, bool sendcan) {
UpdateValid(last_sec);
}

void CANParser::update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan) {
uint64_t current_sec = 0;
for (const auto &d : data) {
update_string(d, sendcan);
if (current_sec == 0) {
current_sec = last_sec;
}
}
query_latest(vals, current_sec);
}

void CANParser::UpdateCans(uint64_t sec, const capnp::List<cereal::CanData>::Reader& cans) {
//DEBUG("got %d messages\n", cans.size());

bool bus_empty = true;

// parse the messages
for (int i = 0; i < cans.size(); i++) {
auto cmsg = cans[i];
for (const auto cmsg : cans) {
if (cmsg.getSrc() != bus) {
// DEBUG("skip %d: wrong bus\n", cmsg.getAddress());
continue;
Expand Down Expand Up @@ -301,16 +311,19 @@ void CANParser::UpdateValid(uint64_t sec) {
can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid;
}

std::vector<SignalValue> CANParser::query_latest() {
std::vector<SignalValue> ret;

void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t last_ts) {
if (last_ts == 0) {
last_ts = last_sec;
}
for (auto& kv : message_states) {
auto& state = kv.second;
if (last_sec != 0 && state.last_seen_nanos != last_sec) continue;
if (last_ts != 0 && state.last_seen_nanos < last_ts) {
continue;
}

for (int i = 0; i < state.parse_sigs.size(); i++) {
const Signal &sig = state.parse_sigs[i];
SignalValue &v = ret.emplace_back();
SignalValue &v = vals.emplace_back();
v.address = state.address;
v.ts_nanos = state.last_seen_nanos;
v.name = sig.name;
Expand All @@ -319,6 +332,4 @@ std::vector<SignalValue> CANParser::query_latest() {
state.all_vals[i].clear();
}
}

return ret;
}
45 changes: 15 additions & 30 deletions can/parser_pyx.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3

from cython.operator cimport dereference as deref, preincrement as preinc
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.unordered_set cimport unordered_set
Expand Down Expand Up @@ -101,47 +102,31 @@ cdef class CANParser:
message_options_v.push_back(mpo)

self.can = new cpp_CANParser(bus, dbc_name, message_options_v, signal_options_v)
self.update_vl()
self.update_strings([])

cdef unordered_set[uint32_t] update_vl(self):
def update_strings(self, strings, sendcan=False):
for v in self.vl_all.values():
for l in v.values():
l.clear()

cdef vector[SignalValue] new_vals
cdef unordered_set[uint32_t] updated_addrs

new_vals = self.can.query_latest()
for cv in new_vals:
self.can.update_strings(strings, new_vals, sendcan)
cdef vector[SignalValue].iterator it = new_vals.begin()
cdef SignalValue* cv
while it != new_vals.end():
cv = &deref(it)
# Cast char * directly to unicode
cv_name = <unicode>cv.name
self.vl[cv.address][cv_name] = cv.value
self.vl_all[cv.address][cv_name] = cv.all_values
self.ts_nanos[cv.address][cv_name] = cv.ts_nanos

vl_all = self.vl_all[cv.address]
if (cv_name in vl_all):
vl_all[cv_name].extend(cv.all_values)
else:
vl_all[cv_name] = cv.all_values

updated_addrs.insert(cv.address)
preinc(it)

return updated_addrs

def update_string(self, dat, sendcan=False):
for v in self.vl_all.values():
for l in v.values():
l.clear()

self.can.update_string(dat, sendcan)
return self.update_vl()

def update_strings(self, strings, sendcan=False):
for v in self.vl_all.values():
for l in v.values():
l.clear()

updated_addrs = set()
for s in strings:
self.can.update_string(s, sendcan)
updated_addrs.update(self.update_vl())
return updated_addrs

@property
def can_valid(self):
return self.can.can_valid
Expand Down
20 changes: 10 additions & 10 deletions can/tests/test_packer_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_packer_counter(self):
for i in range(1000):
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
dat = can_list_to_can_capnp([msg, ])
parser.update_string(dat)
parser.update_strings([dat])
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], i % 256)

# setting COUNTER should override
Expand All @@ -67,15 +67,15 @@ def test_packer_counter(self):
"COUNTER": cnt,
})
dat = can_list_to_can_capnp([msg, ])
parser.update_string(dat)
parser.update_strings([dat])
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], cnt)

# then, should resume counting from the override value
cnt = parser.vl["CAN_FD_MESSAGE"]["COUNTER"]
for i in range(100):
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
dat = can_list_to_can_capnp([msg, ])
parser.update_string(dat)
parser.update_strings([dat])
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], (cnt + i) % 256)

def test_parser_can_valid(self):
Expand All @@ -92,15 +92,15 @@ def test_parser_can_valid(self):
# not valid until the message is seen
for _ in range(100):
dat = can_list_to_can_capnp([])
parser.update_string(dat)
parser.update_strings([dat])
self.assertFalse(parser.can_valid)

# valid once seen
for i in range(1, 100):
t = int(0.01 * i * 1e9)
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
dat = can_list_to_can_capnp([msg, ], logMonoTime=t)
parser.update_string(dat)
parser.update_strings([dat])
self.assertTrue(parser.can_valid)

def test_packer_parser(self):
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_packer_parser(self):

msgs = [packer.make_can_msg(k, 0, v) for k, v in values.items()]
bts = can_list_to_can_capnp(msgs)
parser.update_string(bts)
parser.update_strings([bts])

for k, v in values.items():
for key, val in v.items():
Expand All @@ -168,7 +168,7 @@ def test_scale_offset(self):
msgs = packer.make_can_msg("VSA_STATUS", 0, values)
bts = can_list_to_can_capnp([msgs])

parser.update_string(bts)
parser.update_strings([bts])

self.assertAlmostEqual(parser.vl["VSA_STATUS"]["USER_BRAKE"], brake)

Expand Down Expand Up @@ -199,7 +199,7 @@ def test_subaru(self):

msgs = packer.make_can_msg("ES_LKAS", 0, values)
bts = can_list_to_can_capnp([msgs])
parser.update_string(bts)
parser.update_strings([bts])

self.assertAlmostEqual(parser.vl["ES_LKAS"]["LKAS_Output"], steer)
self.assertAlmostEqual(parser.vl["ES_LKAS"]["LKAS_Request"], active)
Expand Down Expand Up @@ -306,8 +306,8 @@ def test_timestamp_nanos(self):
for _ in range(10):
can_strings = []
log_mono_time = 0
for _ in range(10):
log_mono_time = int(random.uniform(1, 60) * 1e+9)
for i in range(10):
log_mono_time = int(0.01 * i * 1e+9)
can_msg = packer.make_can_msg("VSA_STATUS", 0, {})
can_strings.append(can_list_to_can_capnp([can_msg], logMonoTime=log_mono_time))
parser.update_strings(can_strings)
Expand Down
14 changes: 6 additions & 8 deletions can/tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from opendbc.can.tests.test_packer_parser import can_list_to_can_capnp



class TestParser(unittest.TestCase):
def _benchmark(self, signals, checks, thresholds, n):
parser = CANParser('toyota_new_mc_pt_generated', signals, checks, 0, False)
Expand All @@ -33,15 +32,14 @@ def _benchmark(self, signals, checks, thresholds, n):
else:
t1 = time.process_time_ns()
for m in can_msgs:
parser.update_string(m)
parser.update_strings([m])
t2 = time.process_time_ns()

ets.append(t2 - t1)

et = sum(ets) / len(ets)
avg_nanos = et / len(can_msgs)
method = 'update_strings' if n > 1 else 'update_string'
print('%s: [%s] %.1fms to parse %s, avg: %dns' % (self._testMethodName, method, et/1e6, len(can_msgs), avg_nanos))
print('%s: [%d] %.1fms to parse %s, avg: %dns' % (self._testMethodName, n, et/1e6, len(can_msgs), avg_nanos))

minn, maxx = thresholds
self.assertLess(avg_nanos, maxx)
Expand All @@ -51,8 +49,8 @@ def test_performance_one_signal(self):
signals = [
("ACCEL_CMD", "ACC_CONTROL"),
]
self._benchmark(signals, [('ACC_CONTROL', 10)], (5000, 7000), 1)
self._benchmark(signals, [('ACC_CONTROL', 10)], (2200, 3300), 10)
self._benchmark(signals, [('ACC_CONTROL', 10)], (4000, 18000), 1)
self._benchmark(signals, [('ACC_CONTROL', 10)], (700, 3000), 10)

def test_performance_all_signals(self):
signals = [
Expand All @@ -70,8 +68,8 @@ def test_performance_all_signals(self):
("ACCEL_CMD_ALT", "ACC_CONTROL"),
("CHECKSUM", "ACC_CONTROL"),
]
self._benchmark(signals, [('ACC_CONTROL', 10)], (12000, 19000), 1)
self._benchmark(signals, [('ACC_CONTROL', 10)], (7000, 13000), 10)
self._benchmark(signals, [('ACC_CONTROL', 10)], (10000, 19000), 1)
self._benchmark(signals, [('ACC_CONTROL', 10)], (1300, 5000), 10)


if __name__ == "__main__":
Expand Down