Skip to content

Commit

Permalink
Configuration of Flow Control packets generation (#273)
Browse files Browse the repository at this point in the history
Add generator of Flow Control parameters that would be used to generate Flow Control packets during message reception.
  • Loading branch information
mdabrowski1990 authored Jul 29, 2024
1 parent 51f8e08 commit 50e273b
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 28 deletions.
23 changes: 5 additions & 18 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,14 @@ jobs:
run: |
pip install .[test]
- name: Prepare coverage uploader [CodeCov]
run: |
curl -Os https://app.codecov.io/gh/mdabrowski1990/uds/uploader/linux/codecov
chmod +x codecov
- name: Execute unit tests [pytest]
run: |
pytest tests/software_tests --cov-report=xml --cov=uds -m "not integration and not performance"
- name: Upload unit tests report [CodeCov]
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.xml
flags: unit-tests
Expand All @@ -86,23 +82,14 @@ jobs:
pytest tests/software_tests --cov-report=xml --cov=uds -m "integration"
- name: Upload integration tests report [CodeCov]
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.xml
flags: integration-tests

# TODO: uncomment when performance tests are added
# - name: Execute performance tests [pytest]
# run: |
# pytest tests/software_tests --cov-report=xml --cov=uds -m "performance"
#
# - name: Upload performance tests report [CodeCov]
# uses: codecov/codecov-action@v3
# with:
# token: ${{ secrets.CODECOV_TOKEN }}
# files: coverage.xml
# flags: performance-tests
# TODO: add performance tests upload when added


static_code_analysis:
Expand Down
84 changes: 83 additions & 1 deletion tests/software_tests/can/test_flow_control.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
from mock import call, patch
from mock import Mock, call, patch

from uds.can import CanAddressingFormat
from uds.can.flow_control import (
AbstractFlowControlParametersGenerator,
CanDlcHandler,
CanFlowControlHandler,
CanFlowStatus,
CanSTminTranslator,
DefaultFlowControlParametersGenerator,
InconsistentArgumentsError,
)
from uds.utilities import NibbleEnum, ValidatedEnum
Expand Down Expand Up @@ -796,3 +798,83 @@ def test_validate_frame_data__invalid(self, addressing_format, raw_frame_data):
with pytest.raises(ValueError):
CanFlowControlHandler.validate_frame_data(addressing_format=addressing_format,
raw_frame_data=raw_frame_data)


class TestAbstractFlowControlParametersGenerator:
"""Unit tests for 'AbstractFlowControlParametersGenerator' class."""

def setup_method(self):
self.mock_flow_control_parameters_generator = Mock(spec=AbstractFlowControlParametersGenerator)

# __iter__

def test_iter(self):
assert (AbstractFlowControlParametersGenerator.__iter__(self.mock_flow_control_parameters_generator)
== self.mock_flow_control_parameters_generator)


class TestDefaultFlowControlParametersGenerator:
"""Unit tests for 'DefaultFlowControlParametersGenerator' class."""

def setup_method(self):
self.mock_flow_control_parameters_generator = Mock(spec=AbstractFlowControlParametersGenerator)
# patching
self._patcher_validate_raw_byte = patch(f"{SCRIPT_LOCATION}.validate_raw_byte")
self.mock_validate_raw_byte = self._patcher_validate_raw_byte.start()

def teardown_method(self):
self._patcher_validate_raw_byte.stop()

# inheritance

def test_inheritance(self):
assert issubclass(DefaultFlowControlParametersGenerator, AbstractFlowControlParametersGenerator)

# __init__

@pytest.mark.parametrize("block_size, st_min", [
(0, 0),
(0xFF, 0xFF)
])
def test_init(self, block_size, st_min):
DefaultFlowControlParametersGenerator.__init__(self.mock_flow_control_parameters_generator, block_size, st_min)
assert self.mock_flow_control_parameters_generator.block_size == block_size
assert self.mock_flow_control_parameters_generator.st_min == st_min

# next

@pytest.mark.parametrize("block_size, st_min", [
(0, 0),
(0xFF, 0xFF)
])
def test_next(self, block_size, st_min):
self.mock_flow_control_parameters_generator.block_size = block_size
self.mock_flow_control_parameters_generator.st_min = st_min
assert (DefaultFlowControlParametersGenerator.__next__(self.mock_flow_control_parameters_generator) ==
(CanFlowStatus.ContinueToSend, block_size, st_min))

# block_size

@pytest.mark.parametrize("value", ["something", Mock()])
def test_block_size__get(self, value):
self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__block_size = value
assert DefaultFlowControlParametersGenerator.block_size.fget(self.mock_flow_control_parameters_generator) == value

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__set(self, value):
DefaultFlowControlParametersGenerator.block_size.fset(self.mock_flow_control_parameters_generator, value)
assert self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__block_size == value
self.mock_validate_raw_byte.assert_called_once_with(value)

# st_min

@pytest.mark.parametrize("value", ["something", Mock()])
def test_st_min__get(self, value):
self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__st_min = value
assert DefaultFlowControlParametersGenerator.st_min.fget(self.mock_flow_control_parameters_generator) == value

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__set(self, value):
DefaultFlowControlParametersGenerator.st_min.fset(self.mock_flow_control_parameters_generator, value)
assert self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__st_min == value
self.mock_validate_raw_byte.assert_called_once_with(value)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from uds.transport_interface.can_transport_interface import (
AbstractCanAddressingInformation,
AbstractCanTransportInterface,
AbstractFlowControlParametersGenerator,
BusABC,
CanPacket,
CanPacketType,
DefaultFlowControlParametersGenerator,
PyCanTransportInterface,
TransmissionDirection,
)
Expand Down Expand Up @@ -74,22 +76,24 @@ def test_init__valid_mandatory_args(self, mock_isinstance,
assert self.mock_can_transport_interface.n_br == self.mock_can_transport_interface.DEFAULT_N_BR
assert self.mock_can_transport_interface.n_cs == self.mock_can_transport_interface.DEFAULT_N_CS
assert self.mock_can_transport_interface.n_cr_timeout == self.mock_can_transport_interface.N_CR_TIMEOUT
assert (self.mock_can_transport_interface.flow_control_parameters_generator
== self.mock_can_transport_interface.DEFAULT_FLOW_CONTROL_PARAMETERS)

@pytest.mark.parametrize("can_bus_manager, addressing_information", [
("can_bus_manager", "addressing_information"),
(Mock(), Mock()),
])
@pytest.mark.parametrize("n_as_timeout, n_ar_timeout, n_bs_timeout, n_br, n_cs, n_cr_timeout, "
"dlc, use_data_optimization, filler_byte", [
"dlc, use_data_optimization, filler_byte, flow_control_parameters_generator", [
("n_as_timeout", "n_ar_timeout", "n_bs_timeout", "n_br", "n_cs", "n_cr_timeout", "dlc", "use_data_optimization",
"filler_byte"),
(Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock()),
"filler_byte", "flow_control_parameters_generator"),
(Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock()),
])
@patch(f"{SCRIPT_LOCATION}.isinstance")
def test_init__valid_all_args(self, mock_isinstance,
can_bus_manager, addressing_information,
n_as_timeout, n_ar_timeout, n_bs_timeout, n_br, n_cs, n_cr_timeout,
dlc, use_data_optimization, filler_byte):
dlc, use_data_optimization, filler_byte, flow_control_parameters_generator):
mock_isinstance.return_value = True
AbstractCanTransportInterface.__init__(self=self.mock_can_transport_interface,
can_bus_manager=can_bus_manager,
Expand All @@ -102,7 +106,8 @@ def test_init__valid_all_args(self, mock_isinstance,
n_cr_timeout=n_cr_timeout,
dlc=dlc,
use_data_optimization=use_data_optimization,
filler_byte=filler_byte)
filler_byte=filler_byte,
flow_control_parameters_generator=flow_control_parameters_generator)
mock_isinstance.assert_called_once_with(addressing_information, AbstractCanAddressingInformation)
self.mock_abstract_transport_interface_init.assert_called_once_with(bus_manager=can_bus_manager)
self.mock_can_segmenter_class.assert_called_once_with(
Expand All @@ -120,6 +125,7 @@ def test_init__valid_all_args(self, mock_isinstance,
assert self.mock_can_transport_interface.n_br == n_br
assert self.mock_can_transport_interface.n_cs == n_cs
assert self.mock_can_transport_interface.n_cr_timeout == n_cr_timeout
assert self.mock_can_transport_interface.flow_control_parameters_generator == flow_control_parameters_generator

# segmenter

Expand Down Expand Up @@ -498,6 +504,25 @@ def test_filler_byte__set(self, value):
AbstractCanTransportInterface.filler_byte.fset(self.mock_can_transport_interface, value)
assert self.mock_can_transport_interface.segmenter.filler_byte == value

# flow_control_parameters_generator

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__get(self, value):
self.mock_can_transport_interface._AbstractCanTransportInterface__flow_control_parameters_generator = value
assert AbstractCanTransportInterface.flow_control_parameters_generator.fget(self.mock_can_transport_interface) \
== value

@pytest.mark.parametrize("value", [Mock(spec=AbstractFlowControlParametersGenerator),
Mock(spec=DefaultFlowControlParametersGenerator)])
def test_flow_control_parameters_generator__set(self, value):
AbstractCanTransportInterface.flow_control_parameters_generator.fset(self.mock_can_transport_interface, value)
assert self.mock_can_transport_interface._AbstractCanTransportInterface__flow_control_parameters_generator == value

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__set__type_error(self, value):
with pytest.raises(TypeError):
AbstractCanTransportInterface.flow_control_parameters_generator.fset(self.mock_can_transport_interface, value)


class TestPyCanTransportInterface:
"""Unit tests for `PyCanTransportInterface` class."""
Expand Down
9 changes: 8 additions & 1 deletion uds/can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
from .consecutive_frame import CanConsecutiveFrameHandler
from .extended_addressing_information import ExtendedCanAddressingInformation
from .first_frame import CanFirstFrameHandler
from .flow_control import CanFlowControlHandler, CanFlowStatus, CanSTminTranslator
from .flow_control import (
AbstractFlowControlParametersGenerator,
CanFlowControlHandler,
CanFlowStatus,
CanSTminTranslator,
DefaultFlowControlParametersGenerator,
FlowControlParametersAlias,
)
from .frame_fields import DEFAULT_FILLER_BYTE, CanDlcHandler, CanIdHandler
from .mixed_addressing_information import Mixed11BitCanAddressingInformation, Mixed29BitCanAddressingInformation
from .normal_addressing_information import Normal11BitCanAddressingInformation, NormalFixedCanAddressingInformation
Expand Down
89 changes: 87 additions & 2 deletions uds/can/flow_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
- :ref:`Separation Time minimum (STmin) <knowledge-base-can-st-min>`
"""

__all__ = ["CanFlowStatus", "CanSTminTranslator", "CanFlowControlHandler", "UnrecognizedSTminWarning"]
__all__ = ["CanFlowStatus", "CanSTminTranslator", "CanFlowControlHandler", "UnrecognizedSTminWarning",
"AbstractFlowControlParametersGenerator", "DefaultFlowControlParametersGenerator",
"FlowControlParametersAlias"]

from typing import Optional
from abc import ABC, abstractmethod
from typing import Optional, Tuple
from warnings import warn

from aenum import unique
Expand Down Expand Up @@ -474,3 +477,85 @@ def __encode_any_flow_status(cls,
validate_raw_byte(st_min)
output.append(st_min)
return output


FlowControlParametersAlias = Tuple[int, Optional[int], Optional[int]]
"""Alias of :ref:`Flow Control <knowledge-base-can-flow-control>` parameters which contains:
- :ref:`Flow Status <knowledge-base-can-flow-status>`
- :ref:`Block Size <knowledge-base-can-block-size>`
- :ref:`Separation Time minimum <knowledge-base-can-st-min>`"""


class AbstractFlowControlParametersGenerator(ABC):
"""Definition of Flow Control parameters generator."""

def __iter__(self) -> "AbstractFlowControlParametersGenerator":
"""Get iterator object - called on each Single Frame reception."""
return self

@abstractmethod
def __next__(self) -> FlowControlParametersAlias:
"""
Generate next set of Flow Control parameters - called on each Flow Control message building.
:return: Tuple with values of Flow Control parameters (Flow Status, Block Size, ST min).
"""


class DefaultFlowControlParametersGenerator(AbstractFlowControlParametersGenerator):
"""
Default (recommended to use) Flow Control parameters generator.
Every generated Flow Control parameters will contain the same (valid) values.
"""

def __init__(self, block_size: int = 0, st_min: int = 0):
"""
Set values of Block Size and Separation Time minimum parameters to use.
:param block_size: Value of :ref:`Block Size <knowledge-base-can-block-size>` parameter to use.
:param st_min: Value of :ref:`Separation Time minimum <knowledge-base-can-st-min>` parameter to use.
"""
self.block_size = block_size
self.st_min = st_min

def __next__(self):
"""
Generate next set of Flow Control parameters.
:return: Tuple with values of Flow Control parameters:
- Flow Status=0 (continue to send packets)
- Block Size (user provided)
- Separation Time minimum (user provided)
"""
return CanFlowStatus.ContinueToSend, self.block_size, self.st_min

@property
def block_size(self) -> int:
"""Value of Block Size parameter."""
return self.__block_size

@block_size.setter
def block_size(self, value: int):
"""
Set value of Block Size parameter.
:param value: Value to set.
"""
validate_raw_byte(value)
self.__block_size = value

@property
def st_min(self) -> int:
"""Value of Separation Time minimum parameter."""
return self.__st_min

@st_min.setter
def st_min(self, value: int):
"""
Set value of Separation Time minimum parameter.
:param value: Value to set.
"""
validate_raw_byte(value)
self.__st_min = value
Loading

0 comments on commit 50e273b

Please sign in to comment.