From 50e273bfa883d7dbdf611a41bc10b2140cdc89b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20D=C4=85browski?= <51504507+mdabrowski1990@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:58:41 +0200 Subject: [PATCH] Configuration of Flow Control packets generation (#273) Add generator of Flow Control parameters that would be used to generate Flow Control packets during message reception. --- .github/workflows/testing.yml | 23 ++--- tests/software_tests/can/test_flow_control.py | 84 ++++++++++++++++- .../test_can_transport_interface.py | 35 ++++++-- uds/can/__init__.py | 9 +- uds/can/flow_control.py | 89 ++++++++++++++++++- .../can_transport_interface.py | 32 ++++++- 6 files changed, 244 insertions(+), 28 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index de8bef21..2161d16b 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -65,18 +65,14 @@ jobs: run: | pip install .[test] - - name: Prepare coverage uploader [CodeCov] - run: | - curl -Os https://app.codecov.io/gh/mdabrowski1990/uds/uploader/linux/codecov - chmod +x codecov - - name: Execute unit tests [pytest] run: | pytest tests/software_tests --cov-report=xml --cov=uds -m "not integration and not performance" - name: Upload unit tests report [CodeCov] - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: + fail_ci_if_error: true token: ${{ secrets.CODECOV_TOKEN }} files: coverage.xml flags: unit-tests @@ -86,23 +82,14 @@ jobs: pytest tests/software_tests --cov-report=xml --cov=uds -m "integration" - name: Upload integration tests report [CodeCov] - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: + fail_ci_if_error: true token: ${{ secrets.CODECOV_TOKEN }} files: coverage.xml flags: integration-tests -# TODO: uncomment when performance tests are added -# - name: Execute performance tests [pytest] -# run: | -# pytest tests/software_tests --cov-report=xml --cov=uds -m "performance" -# -# - name: Upload performance tests report [CodeCov] -# uses: codecov/codecov-action@v3 -# with: -# token: ${{ secrets.CODECOV_TOKEN }} -# files: coverage.xml -# flags: performance-tests +# TODO: add performance tests upload when added static_code_analysis: diff --git a/tests/software_tests/can/test_flow_control.py b/tests/software_tests/can/test_flow_control.py index 13182d5b..cd0923d8 100644 --- a/tests/software_tests/can/test_flow_control.py +++ b/tests/software_tests/can/test_flow_control.py @@ -1,12 +1,14 @@ import pytest -from mock import call, patch +from mock import Mock, call, patch from uds.can import CanAddressingFormat from uds.can.flow_control import ( + AbstractFlowControlParametersGenerator, CanDlcHandler, CanFlowControlHandler, CanFlowStatus, CanSTminTranslator, + DefaultFlowControlParametersGenerator, InconsistentArgumentsError, ) from uds.utilities import NibbleEnum, ValidatedEnum @@ -796,3 +798,83 @@ def test_validate_frame_data__invalid(self, addressing_format, raw_frame_data): with pytest.raises(ValueError): CanFlowControlHandler.validate_frame_data(addressing_format=addressing_format, raw_frame_data=raw_frame_data) + + +class TestAbstractFlowControlParametersGenerator: + """Unit tests for 'AbstractFlowControlParametersGenerator' class.""" + + def setup_method(self): + self.mock_flow_control_parameters_generator = Mock(spec=AbstractFlowControlParametersGenerator) + + # __iter__ + + def test_iter(self): + assert (AbstractFlowControlParametersGenerator.__iter__(self.mock_flow_control_parameters_generator) + == self.mock_flow_control_parameters_generator) + + +class TestDefaultFlowControlParametersGenerator: + """Unit tests for 'DefaultFlowControlParametersGenerator' class.""" + + def setup_method(self): + self.mock_flow_control_parameters_generator = Mock(spec=AbstractFlowControlParametersGenerator) + # patching + self._patcher_validate_raw_byte = patch(f"{SCRIPT_LOCATION}.validate_raw_byte") + self.mock_validate_raw_byte = self._patcher_validate_raw_byte.start() + + def teardown_method(self): + self._patcher_validate_raw_byte.stop() + + # inheritance + + def test_inheritance(self): + assert issubclass(DefaultFlowControlParametersGenerator, AbstractFlowControlParametersGenerator) + + # __init__ + + @pytest.mark.parametrize("block_size, st_min", [ + (0, 0), + (0xFF, 0xFF) + ]) + def test_init(self, block_size, st_min): + DefaultFlowControlParametersGenerator.__init__(self.mock_flow_control_parameters_generator, block_size, st_min) + assert self.mock_flow_control_parameters_generator.block_size == block_size + assert self.mock_flow_control_parameters_generator.st_min == st_min + + # next + + @pytest.mark.parametrize("block_size, st_min", [ + (0, 0), + (0xFF, 0xFF) + ]) + def test_next(self, block_size, st_min): + self.mock_flow_control_parameters_generator.block_size = block_size + self.mock_flow_control_parameters_generator.st_min = st_min + assert (DefaultFlowControlParametersGenerator.__next__(self.mock_flow_control_parameters_generator) == + (CanFlowStatus.ContinueToSend, block_size, st_min)) + + # block_size + + @pytest.mark.parametrize("value", ["something", Mock()]) + def test_block_size__get(self, value): + self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__block_size = value + assert DefaultFlowControlParametersGenerator.block_size.fget(self.mock_flow_control_parameters_generator) == value + + @pytest.mark.parametrize("value", ["something", Mock()]) + def test_flow_control_parameters_generator__set(self, value): + DefaultFlowControlParametersGenerator.block_size.fset(self.mock_flow_control_parameters_generator, value) + assert self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__block_size == value + self.mock_validate_raw_byte.assert_called_once_with(value) + + # st_min + + @pytest.mark.parametrize("value", ["something", Mock()]) + def test_st_min__get(self, value): + self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__st_min = value + assert DefaultFlowControlParametersGenerator.st_min.fget(self.mock_flow_control_parameters_generator) == value + + @pytest.mark.parametrize("value", ["something", Mock()]) + def test_flow_control_parameters_generator__set(self, value): + DefaultFlowControlParametersGenerator.st_min.fset(self.mock_flow_control_parameters_generator, value) + assert self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__st_min == value + self.mock_validate_raw_byte.assert_called_once_with(value) diff --git a/tests/software_tests/transport_interface/test_can_transport_interface.py b/tests/software_tests/transport_interface/test_can_transport_interface.py index e16b3fc3..f646f64e 100644 --- a/tests/software_tests/transport_interface/test_can_transport_interface.py +++ b/tests/software_tests/transport_interface/test_can_transport_interface.py @@ -5,9 +5,11 @@ from uds.transport_interface.can_transport_interface import ( AbstractCanAddressingInformation, AbstractCanTransportInterface, + AbstractFlowControlParametersGenerator, BusABC, CanPacket, CanPacketType, + DefaultFlowControlParametersGenerator, PyCanTransportInterface, TransmissionDirection, ) @@ -74,22 +76,24 @@ def test_init__valid_mandatory_args(self, mock_isinstance, assert self.mock_can_transport_interface.n_br == self.mock_can_transport_interface.DEFAULT_N_BR assert self.mock_can_transport_interface.n_cs == self.mock_can_transport_interface.DEFAULT_N_CS assert self.mock_can_transport_interface.n_cr_timeout == self.mock_can_transport_interface.N_CR_TIMEOUT + assert (self.mock_can_transport_interface.flow_control_parameters_generator + == self.mock_can_transport_interface.DEFAULT_FLOW_CONTROL_PARAMETERS) @pytest.mark.parametrize("can_bus_manager, addressing_information", [ ("can_bus_manager", "addressing_information"), (Mock(), Mock()), ]) @pytest.mark.parametrize("n_as_timeout, n_ar_timeout, n_bs_timeout, n_br, n_cs, n_cr_timeout, " - "dlc, use_data_optimization, filler_byte", [ + "dlc, use_data_optimization, filler_byte, flow_control_parameters_generator", [ ("n_as_timeout", "n_ar_timeout", "n_bs_timeout", "n_br", "n_cs", "n_cr_timeout", "dlc", "use_data_optimization", - "filler_byte"), - (Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock()), + "filler_byte", "flow_control_parameters_generator"), + (Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock()), ]) @patch(f"{SCRIPT_LOCATION}.isinstance") def test_init__valid_all_args(self, mock_isinstance, can_bus_manager, addressing_information, n_as_timeout, n_ar_timeout, n_bs_timeout, n_br, n_cs, n_cr_timeout, - dlc, use_data_optimization, filler_byte): + dlc, use_data_optimization, filler_byte, flow_control_parameters_generator): mock_isinstance.return_value = True AbstractCanTransportInterface.__init__(self=self.mock_can_transport_interface, can_bus_manager=can_bus_manager, @@ -102,7 +106,8 @@ def test_init__valid_all_args(self, mock_isinstance, n_cr_timeout=n_cr_timeout, dlc=dlc, use_data_optimization=use_data_optimization, - filler_byte=filler_byte) + filler_byte=filler_byte, + flow_control_parameters_generator=flow_control_parameters_generator) mock_isinstance.assert_called_once_with(addressing_information, AbstractCanAddressingInformation) self.mock_abstract_transport_interface_init.assert_called_once_with(bus_manager=can_bus_manager) self.mock_can_segmenter_class.assert_called_once_with( @@ -120,6 +125,7 @@ def test_init__valid_all_args(self, mock_isinstance, assert self.mock_can_transport_interface.n_br == n_br assert self.mock_can_transport_interface.n_cs == n_cs assert self.mock_can_transport_interface.n_cr_timeout == n_cr_timeout + assert self.mock_can_transport_interface.flow_control_parameters_generator == flow_control_parameters_generator # segmenter @@ -498,6 +504,25 @@ def test_filler_byte__set(self, value): AbstractCanTransportInterface.filler_byte.fset(self.mock_can_transport_interface, value) assert self.mock_can_transport_interface.segmenter.filler_byte == value + # flow_control_parameters_generator + + @pytest.mark.parametrize("value", ["something", Mock()]) + def test_flow_control_parameters_generator__get(self, value): + self.mock_can_transport_interface._AbstractCanTransportInterface__flow_control_parameters_generator = value + assert AbstractCanTransportInterface.flow_control_parameters_generator.fget(self.mock_can_transport_interface) \ + == value + + @pytest.mark.parametrize("value", [Mock(spec=AbstractFlowControlParametersGenerator), + Mock(spec=DefaultFlowControlParametersGenerator)]) + def test_flow_control_parameters_generator__set(self, value): + AbstractCanTransportInterface.flow_control_parameters_generator.fset(self.mock_can_transport_interface, value) + assert self.mock_can_transport_interface._AbstractCanTransportInterface__flow_control_parameters_generator == value + + @pytest.mark.parametrize("value", ["something", Mock()]) + def test_flow_control_parameters_generator__set__type_error(self, value): + with pytest.raises(TypeError): + AbstractCanTransportInterface.flow_control_parameters_generator.fset(self.mock_can_transport_interface, value) + class TestPyCanTransportInterface: """Unit tests for `PyCanTransportInterface` class.""" diff --git a/uds/can/__init__.py b/uds/can/__init__.py index a5e51ce4..bc6dfd97 100644 --- a/uds/can/__init__.py +++ b/uds/can/__init__.py @@ -21,7 +21,14 @@ from .consecutive_frame import CanConsecutiveFrameHandler from .extended_addressing_information import ExtendedCanAddressingInformation from .first_frame import CanFirstFrameHandler -from .flow_control import CanFlowControlHandler, CanFlowStatus, CanSTminTranslator +from .flow_control import ( + AbstractFlowControlParametersGenerator, + CanFlowControlHandler, + CanFlowStatus, + CanSTminTranslator, + DefaultFlowControlParametersGenerator, + FlowControlParametersAlias, +) from .frame_fields import DEFAULT_FILLER_BYTE, CanDlcHandler, CanIdHandler from .mixed_addressing_information import Mixed11BitCanAddressingInformation, Mixed29BitCanAddressingInformation from .normal_addressing_information import Normal11BitCanAddressingInformation, NormalFixedCanAddressingInformation diff --git a/uds/can/flow_control.py b/uds/can/flow_control.py index 75ecce56..35f6dc06 100644 --- a/uds/can/flow_control.py +++ b/uds/can/flow_control.py @@ -7,9 +7,12 @@ - :ref:`Separation Time minimum (STmin) ` """ -__all__ = ["CanFlowStatus", "CanSTminTranslator", "CanFlowControlHandler", "UnrecognizedSTminWarning"] +__all__ = ["CanFlowStatus", "CanSTminTranslator", "CanFlowControlHandler", "UnrecognizedSTminWarning", + "AbstractFlowControlParametersGenerator", "DefaultFlowControlParametersGenerator", + "FlowControlParametersAlias"] -from typing import Optional +from abc import ABC, abstractmethod +from typing import Optional, Tuple from warnings import warn from aenum import unique @@ -474,3 +477,85 @@ def __encode_any_flow_status(cls, validate_raw_byte(st_min) output.append(st_min) return output + + +FlowControlParametersAlias = Tuple[int, Optional[int], Optional[int]] +"""Alias of :ref:`Flow Control ` parameters which contains: +- :ref:`Flow Status ` +- :ref:`Block Size ` +- :ref:`Separation Time minimum `""" + + +class AbstractFlowControlParametersGenerator(ABC): + """Definition of Flow Control parameters generator.""" + + def __iter__(self) -> "AbstractFlowControlParametersGenerator": + """Get iterator object - called on each Single Frame reception.""" + return self + + @abstractmethod + def __next__(self) -> FlowControlParametersAlias: + """ + Generate next set of Flow Control parameters - called on each Flow Control message building. + + :return: Tuple with values of Flow Control parameters (Flow Status, Block Size, ST min). + """ + + +class DefaultFlowControlParametersGenerator(AbstractFlowControlParametersGenerator): + """ + Default (recommended to use) Flow Control parameters generator. + + Every generated Flow Control parameters will contain the same (valid) values. + """ + + def __init__(self, block_size: int = 0, st_min: int = 0): + """ + Set values of Block Size and Separation Time minimum parameters to use. + + :param block_size: Value of :ref:`Block Size ` parameter to use. + :param st_min: Value of :ref:`Separation Time minimum ` parameter to use. + """ + self.block_size = block_size + self.st_min = st_min + + def __next__(self): + """ + Generate next set of Flow Control parameters. + + :return: Tuple with values of Flow Control parameters: + - Flow Status=0 (continue to send packets) + - Block Size (user provided) + - Separation Time minimum (user provided) + """ + return CanFlowStatus.ContinueToSend, self.block_size, self.st_min + + @property + def block_size(self) -> int: + """Value of Block Size parameter.""" + return self.__block_size + + @block_size.setter + def block_size(self, value: int): + """ + Set value of Block Size parameter. + + :param value: Value to set. + """ + validate_raw_byte(value) + self.__block_size = value + + @property + def st_min(self) -> int: + """Value of Separation Time minimum parameter.""" + return self.__st_min + + @st_min.setter + def st_min(self, value: int): + """ + Set value of Separation Time minimum parameter. + + :param value: Value to set. + """ + validate_raw_byte(value) + self.__st_min = value diff --git a/uds/transport_interface/can_transport_interface.py b/uds/transport_interface/can_transport_interface.py index 7c467006..963c64e1 100644 --- a/uds/transport_interface/can_transport_interface.py +++ b/uds/transport_interface/can_transport_interface.py @@ -10,7 +10,13 @@ from warnings import warn from can import AsyncBufferedReader, BufferedReader, BusABC, Message, Notifier -from uds.can import AbstractCanAddressingInformation, CanDlcHandler, CanIdHandler +from uds.can import ( + AbstractCanAddressingInformation, + AbstractFlowControlParametersGenerator, + CanDlcHandler, + CanIdHandler, + DefaultFlowControlParametersGenerator, +) from uds.packet import CanPacket, CanPacketRecord, CanPacketType from uds.segmentation import CanSegmenter from uds.transmission_attributes import TransmissionDirection @@ -39,6 +45,10 @@ class AbstractCanTransportInterface(AbstractTransportInterface): """Default value of :ref:`N_Br ` time parameter.""" DEFAULT_N_CS: Optional[TimeMillisecondsAlias] = None """Default value of :ref:`N_Cs ` time parameter.""" + DEFAULT_FLOW_CONTROL_PARAMETERS = DefaultFlowControlParametersGenerator() + """Default value of :ref:`Flow Control ` parameters ( + :ref:`Flow Status `, :ref:`Block Size `, + :ref:`Separation Time minimum `).""" def __init__(self, can_bus_manager: Any, @@ -60,6 +70,7 @@ def __init__(self, - :parameter dlc: Base CAN DLC value to use for CAN Packets. - :parameter use_data_optimization: Information whether to use CAN Frame Data Optimization. - :parameter filler_byte: Filler byte value to use for CAN Frame Data Padding. + - :parameter flow_control_parameters_generator: Generator with Flow Control parameters to use. :raise TypeError: Provided Addressing Information value has unexpected type. """ @@ -73,6 +84,8 @@ def __init__(self, self.n_cr_timeout = kwargs.pop("n_cr_timeout", self.N_CR_TIMEOUT) self.n_br = kwargs.pop("n_br", self.DEFAULT_N_BR) self.n_cs = kwargs.pop("n_cs", self.DEFAULT_N_CS) + self.flow_control_parameters_generator = kwargs.pop("flow_control_parameters_generator", + self.DEFAULT_FLOW_CONTROL_PARAMETERS) self.__segmenter = CanSegmenter(addressing_information=addressing_information, **kwargs) @property @@ -353,6 +366,22 @@ def filler_byte(self, value: int): """ self.segmenter.filler_byte = value + @property + def flow_control_parameters_generator(self) -> AbstractFlowControlParametersGenerator: + """Get generator of Flow Control parameters (Flow Status, Block Size, Separation Time minimum).""" + return self.__flow_control_parameters_generator + + @flow_control_parameters_generator.setter + def flow_control_parameters_generator(self, value: AbstractFlowControlParametersGenerator): + """ + Set value of Flow Control parameters (Flow Status, Block Size, Separation Time minimum) generator. + + :param value: Value to set. + """ + if not isinstance(value, AbstractFlowControlParametersGenerator): + raise TypeError("Provided Flow Control parameters generator value has incorrect type.") + self.__flow_control_parameters_generator = value + class PyCanTransportInterface(AbstractCanTransportInterface): """ @@ -389,6 +418,7 @@ def __init__(self, - :parameter dlc: Base CAN DLC value to use for CAN Packets. - :parameter use_data_optimization: Information whether to use CAN Frame Data Optimization. - :parameter filler_byte: Filler byte value to use for CAN Frame Data Padding. + - :parameter flow_control_parameters_generator: Generator with Flow Control parameters to use. """ self.__n_as_measured: Optional[TimeMillisecondsAlias] = None self.__n_ar_measured: Optional[TimeMillisecondsAlias] = None