Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ce-oem] Add rs485 specific args (New) #1578

Merged
merged 10 commits into from
Nov 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,43 @@
import argparse


def print_ports_config(string: str):
def print_ports_config(string: str, rs485_conf: str = None):
ports_config_list = string.split()
rs485_conf_lists = {}
"""
Parse RS485 config,
e.g.
Input:
RS485_CONFIG = "/dev/ttySC0:True:False:0.0:0.0
/dev/ttySC2:True:False:0.0:0.0"

Output:
rs485_conf_lists = {
"/dev/ttySC0": {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx: 0.0,
"delay_before_rx: 0.0,
}
"/dev/ttySC2": {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx: 0.0,
"delay_before_rx: 0.0,
}
}
"""
if rs485_conf:
for rs485_conf_list in rs485_conf.split():
node, rts_tx, rts_rx, delay_tx, delay_rx = rs485_conf_list.split(
":"
)
rs485_conf_lists[node] = {
"rts_level_for_tx": rts_tx,
"rts_level_for_rx": rts_rx,
"delay_before_tx": delay_tx,
"delay_before_rx": delay_rx,
}
serials = []
rs485_nodes = []
rs422_nodes = []
Expand All @@ -19,8 +54,26 @@ def print_ports_config(string: str):
serial = {}
port_type, port_node, baud_rate = config_parts
serial["type"] = port_type

"""
Init a config dict if type is RS485, and the init value are refer
to the serial.rs485 module.
ref:
https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.rs485.RS485Settings
"""
if port_type == "RS485":
serial["rs485_conf"] = {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx": 0.0,
"delay_before_rx": 0.0,
rickwu666666 marked this conversation as resolved.
Show resolved Hide resolved
}
serial["node"] = port_node
serial["baudrate"] = baud_rate

# Mapping rs485 configs with rs485 node name and update the config
if port_node in rs485_conf_lists.keys():
serial["rs485_conf"] = rs485_conf_lists[port_node]
serials.append(serial)
if port_type == "RS485":
rs485_nodes.append(port_node)
Expand All @@ -30,6 +83,9 @@ def print_ports_config(string: str):
for serial in serials:
print("type: {}".format(serial["type"]))
print("node: {}".format(serial["node"]))
if serial["type"] == "RS485":
for key, value in serial["rs485_conf"].items():
print("{}: {}".format(key, value))
print("baudrate: {}".format(serial["baudrate"]))
print("group: ", end="")
if serial["type"] == "RS485":
Expand All @@ -50,8 +106,15 @@ def main():
type=str,
help="The string needed to be parsed",
)
parser.add_argument(
"--rs485-conf",
type=str,
help="RS485 specific configurations.",
default=None,
required=False,
)
args = parser.parse_args()
print_ports_config(args.string)
print_ports_config(args.string, args.rs485_conf)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import random
import string
import serial.rs485


def init_logger():
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
stopbits: int = serial.STOPBITS_ONE,
timeout: int = 3,
data_size: int = 1024,
rs485_settings: dict = None,
) -> None:
self.node = node
self.type = type
Expand All @@ -73,10 +75,14 @@ def __init__(
self.stopbits = stopbits
self.timeout = timeout
self.data_size = data_size
self.rs485_settings = rs485_settings
self.ser = self.serial_init(node)
self.group = []
for ser in group:
self.group.append(self.serial_init(ser))
try:
self.group.append(self.serial_init(ser))
except Exception:
raise SystemError("Failed to init serial port: {}".format(ser))

def serial_init(self, node: str) -> serial.Serial:
"""Create a serial.Serial object based on the class variables"""
Expand All @@ -88,6 +94,13 @@ def serial_init(self, node: str) -> serial.Serial:
stopbits=self.stopbits,
timeout=self.timeout,
)
if self.type == "RS485":
ser.rs485_mode = serial.rs485.RS485Settings(
rts_level_for_tx=self.rs485_settings.get("rts_level_for_tx"),
rts_level_for_rx=self.rs485_settings.get("rts_level_for_rx"),
delay_before_tx=self.rs485_settings.get("delay_before_tx"),
delay_before_rx=self.rs485_settings.get("delay_before_rx"),
)
ser.reset_input_buffer()
ser.reset_output_buffer()
return ser
Expand Down Expand Up @@ -186,7 +199,7 @@ def console_mode(ser: Serial):
raise SystemExit(1)


def main():
def create_args():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
Expand Down Expand Up @@ -261,8 +274,61 @@ def main():
help="Timeout to receive",
default=3,
)

# Create RS485 subparser that only activates when --type=RS485
rs485_group = parser.add_argument_group(
"RS485 Options", "RS485-specific configuration options"
)
rs485_group.add_argument(
"--rts-level-for-tx",
choices=["True", "False"],
type=str,
help="RTS level for transmission." "Equal to RTS_ON_SEND",
default="True",
required=False,
)
rs485_group.add_argument(
"--rts-level-for-rx",
choices=["True", "False"],
type=str,
help="RTS level for reception." "Equal to RTS_AFTER_SEND",
default="False",
required=False,
)
rs485_group.add_argument(
"--rts-delay-before-tx",
type=float,
help="Delay after setting RTS but before transmission starts.",
default=0.0,
required=False,
)
rs485_group.add_argument(
"--rts-delay-before-rx",
type=float,
help="Delay after transmission ends and resetting RTS.",
default=0.0,
required=False,
)
return parser


def main():
parser = create_args()
args = parser.parse_args()

init_logger()
rs485_settings = {}
if args.type == "RS485":
rs485_settings = {
"rts_level_for_tx": (
True if args.rts_level_for_tx == "True" else False
),
"rts_level_for_rx": (
True if args.rts_level_for_rx == "True" else False
),
"delay_before_tx": args.rts_delay_before_tx,
"delay_before_rx": args.rts_delay_before_rx,
}
ser = Serial(
args.node,
args.type,
Expand All @@ -273,6 +339,7 @@ def main():
stopbits=args.stopbits,
timeout=args.timeout,
data_size=args.datasize,
rs485_settings=rs485_settings,
)

if args.mode == "server":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ _summary:
Generates a serial console resource based on user supplied configuration
_description:
A serial console resource that relies on the user
specifying the number of serial console port.
specifying the number of serial console port.
This is to allow template jobs to then be instantiated.
TYPE:NODE:BAUDRATE
SERIAL_CONSOLE_PORTS=USB:/dev/ttyUSB1:115200
Expand All @@ -24,11 +24,11 @@ template-engine: jinja2
template-id: ce-oem-serial/serial-console-tests
id: ce-oem-serial/serial-console-{{ type }}-{{ node }}-{{ baudrate }}
imports: from com.canonical.plainbox import manifest
requires:
requires:
manifest.has_serial_console_loopback == 'True'
_template-summary: To check if the serial ports can work as a console
_summary: To check if the serial port {{ type }} ({{ node }}) can work as a console
_purpose:
_purpose:
To check the serial port {{ type }} ({{ node }}) can work as a console.
_description:
Have to connect the serial port back to itself
Expand All @@ -46,19 +46,22 @@ _summary:
Generates a serial resource based on user supplied configuration
_description:
A serial resource that relies on the user
specifying the number of serial port.
specifying the number of serial port.
This is to allow template jobs to then be instantiated.
TYPE:NODE:BAUDRATE
SERIAL_PORTS="RS485:/dev/ttyS0:9600 RS485:/dev/ttyS1:9600 RS232:/dev/ttyS2:115200"
For RS485 specific configuration:
NODE:rts_level_for_tx{True|False}:rts_level_for_rx{True|False}:delay_before_tx{float}:delay_before_rx{float}
RS485_CONFIG="/dev/ttySC0:True:False:0.0:0.0"
plugin: resource
estimated_duration: 1.0
environ:
SERIAL_PORTS
SERIAL_PORTS RS485_CONFIG
command:
if [ -z "$SERIAL_PORTS" ]; then
exit 0
fi
serial_config_parser.py "$SERIAL_PORTS"
serial_config_parser.py "$SERIAL_PORTS" --rs485-conf "$RS485_CONFIG"

unit: template
template-resource: ce-oem-serial/serial-list
Expand All @@ -67,11 +70,11 @@ template-engine: jinja2
template-id: ce-oem-serial/serial-transmit-data-tests
id: ce-oem-serial/serial-transmit-data-{{ type }}-{{ node }}-{{ baudrate }}
imports: from com.canonical.plainbox import manifest
requires:
requires:
manifest.has_serial_ehco_server == 'True'
_template-summary:
Transmit data via serial ports
_purpose:
_purpose:
To check the serial port {{ type }} ({{ node }}) can transmit
data with baudate {{ baudrate }}
_description:
Expand All @@ -82,4 +85,9 @@ category_id: com.canonical.certification::serial
estimated_duration: 30
flags: also-after-suspend
command:
serial_test.py {{ node }} --mode client --type {{ type }} --group {{ group }} --baudrate {{ baudrate }}
# shellcheck disable=SC2050
if [ {{ type }} == "RS485" ]; then
serial_test.py {{ node }} --mode client --type {{ type }} --group {{ group }} --baudrate {{ baudrate }} --rts-level-for-tx {{ rts_level_for_tx }} --rts-level-for-rx {{ rts_level_for_rx }} --rts-delay-before-tx {{ delay_before_tx }} --rts-delay-before-rx {{ delay_before_rx }}
else
serial_test.py {{ node }} --mode client --type {{ type }} --group {{ group }} --baudrate {{ baudrate }}
fi
Loading