Skip to content

Commit

Permalink
Implemented decoder with cantools but more testing required (#349)
Browse files Browse the repository at this point in the history
* Implemented decoder with cantools but more testing required

* Goodbye match case and removed linting

* Added config to decoder file
  • Loading branch information
EricGaoo authored Jan 16, 2025
1 parent 0cba033 commit f0c0718
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 0 deletions.
3 changes: 3 additions & 0 deletions py/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"no_lint": true
}
Empty file added py/decoder/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions py/decoder/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"no_lint": true
}
89 changes: 89 additions & 0 deletions py/decoder/decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
import pty
import cantools
import serial


class State:
SOM = "SOM"
ID = "ID"
DLC = "DLC"
DATA = "DATA"
EOM = "EOM"
VALID = "VALID"


class DatagramDecoder:
def __init__(self):
self.message_state = State.SOM
self.buffer = []
self.datagram = None
self.init_serial()
self.init_dbc()

def init_serial(self):
self.master, self.slave = pty.openpty()
self.slave_name = os.ttyname(self.slave)
self.slave_serial = serial.Serial(self.slave_name)

def init_dbc(self):
dbc_path = "/Users/ericgao/box/shared/fwxv/libraries/codegen/system_can.dbc"
self.db = cantools.database.load_file(dbc_path)

def write(self, packet):
self.slave_serial.write(bytes(packet))

def read(self):
recv = os.read(self.master, 1000)
if self.parse_byte(recv):
message = self.db.get_message_by_frame_id(self.datagram["id"])
decoded_data = message.decode(self.datagram['data'])
print(decoded_data)

def read_test(self, byte):
if self.parse_byte(byte):
message = self.db.get_message_by_frame_id(self.datagram["id"])
decoded_data = message.decode(bytes(self.datagram['data']))
print(decoded_data)

def is_valid_id(self, id):
try:
self.db.get_message_by_frame_id(id)
return True
except KeyError:
return False

def parse_byte(self, byte):
if self.message_state == State.SOM or self.message_state == State.VALID:
self.buffer = []
self.datagram = None
if byte == 0xAA:
self.message_state = State.ID
elif self.message_state == State.ID:
self.buffer.append(byte)
if len(self.buffer) == 4:
message_id = int.from_bytes(self.buffer, byteorder='big')
if self.is_valid_id(message_id):
self.datagram = {"id": message_id}
self.buffer = []
self.message_state = State.DLC
else:
self.message_state = State.SOM
elif self.message_state == State.DLC:
self.datagram["dlc"] = byte
if self.datagram["dlc"] <= 9:
self.datagram["data"] = []
self.message_state = State.DATA
else:
self.message_state = State.SOM
elif self.message_state == State.DATA:
self.buffer.append(byte)
if len(self.buffer) == self.datagram["dlc"]:
self.datagram["data"] = self.buffer
self.message_state = State.EOM
elif self.message_state == State.EOM:
if byte == 0xBB:
self.message_state = State.VALID
else:
self.message_state = State.SOM
return self.message_state == State.VALID
23 changes: 23 additions & 0 deletions py/decoder/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from decoder import DatagramDecoder

decoder = DatagramDecoder()

invalid_msg = [0xAA, 0xFF, 0x00, 0x00, 0x00, 0x01, 0x06, 0xAA, 0x01, 0x01, 0x01, 0x01, 0x01, 0xBB]

for byte in invalid_msg:
decoder.read_test(byte)

valid_msg = [0xAA, 0x00, 0x00, 0x00, 0x01, 0x07, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0xBB]

for byte in valid_msg:
decoder.read_test(byte)

invalid_msg = [0xAA, 0x00, 0x00, 0x00, 0x01, 0x07, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0xAA]

for byte in invalid_msg:
decoder.read_test(byte)

valid_msg = [0xAA, 0x00, 0x00, 0x00, 0x01, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xBB]

for byte in valid_msg:
decoder.read_test(byte)

0 comments on commit f0c0718

Please sign in to comment.