diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..7c1a04e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.devcontainer/ +build/ +dist/ diff --git a/.flake8 b/.flake8 index 1387491..ef8aeb2 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,4 @@ [flake8] extend-ignore = E266, E501, E203 max-line-length = 88 -max-complexity = 16 \ No newline at end of file +max-complexity = 16 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..a09f57f --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,40 @@ +name: build wheel for release + +on: + release: + types: [published] + +permissions: + contents: write # upload artifacts requires this permission + +jobs: + build_wheel: + runs-on: ubuntu-22.04 + steps: + - name: Checkout source code + uses: actions/checkout@v4 + + - name: Setup python environment + uses: actions/setup-python@v5 + with: + # use the minimum py ver we support to + # generate the wheel. + python-version: "3.8" + + - name: Build wheel and calculate checksum + run: | + python3 -m pip install -q -U pip + pip install -q -U hatch + hatch build -t wheel + for WHL in dist/*.whl; \ + do \ + sha256sum ${WHL} | sed -E "s@(\w+)\s+.*@sha256:\1@" > \ + ${WHL}.checksum; \ + done + + - name: Upload built wheel as release asset + uses: softprops/action-gh-release@v1 + with: + files: | + dist/*.whl + dist/*.checksum diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..099661b --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,121 @@ +name: test CI + +on: + pull_request: + branches: + - main + # only trigger unit test CI when src or tests changed + paths: + - "src/**" + - "tests/**" + - ".github/workflows/test.yaml" + push: + branches: + - main + # only trigger unit test CI when src or tests changed + paths: + - "src/**" + - "tests/**" + - ".github/workflows/test.yaml" + # allow the test CI to be manually triggerred + workflow_dispatch: + +jobs: + pytest_with_coverage_on_supported_os: + strategy: + fail-fast: true + matrix: + # currently we only need to ensure it is running on the following OS + # with OS-shipped python interpreter. + os: ["ubuntu-20.04", "ubuntu-22.04"] + include: + - os: ubuntu-22.04 + python_version: "3.10" + - os: ubuntu-20.04 + python_version: "3.8" + runs-on: ${{ matrix.os }} + steps: + - name: Checkout commit + uses: actions/checkout@v4 + with: + # sonarcloud needs main branch's ref + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python_version }} + cache: "pip" + + - name: Install package + run: | + python -m pip install -q -U pip + pip install -q .[dev] + + - name: Execute pytest with coverage + run: | + coverage run -m pytest --junit-xml=test_result/pytest.xml + coverage xml -o test_result/coverage.xml + + # export the coverage report to the comment! + - name: Add coverage report to PR comment + continue-on-error: true + uses: MishaKav/pytest-coverage-comment@v1.1.51 + with: + pytest-xml-coverage-path: test_result/coverage.xml + junitxml-path: test_result/pytest.xml + + - name: SonarCloud Scan + uses: SonarSource/sonarcloud-github-action@master + env: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + + pytest_on_supported_python_vers: + runs-on: ubuntu-22.04 + strategy: + fail-fast: true + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11"] + steps: + - name: Checkout commit + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + + - name: Install package + run: | + python -m pip install -q -U pip + pip install -q .[dev] + + - name: Execute pytest + run: pytest + + python_lint_check: + runs-on: ubuntu-22.04 + timeout-minutes: 3 + strategy: + fail-fast: true + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11"] + steps: + - name: Checkout commit + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install check dependencies + run: | + python -m pip install -q -U pip + pip install -q .[dev] + + - name: Check code linting + run: | + black --check src + flake8 src diff --git a/.gitignore b/.gitignore index bc1b77a..d6f5fa9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,164 @@ -.devcontainer/ -cache -coverage +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python build/ +develop-eggs/ dist/ -mypy_cache/ -otaclient_iot_logging_server/_version.py -__pycache__/ -venv/ \ No newline at end of file +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# local vscode settings +.devcontainer +.vscode diff --git a/.markdownlint.yaml b/.markdownlint.yaml new file mode 100644 index 0000000..44fcb49 --- /dev/null +++ b/.markdownlint.yaml @@ -0,0 +1,4 @@ +"MD013": false +"MD041": false +"MD024": + "siblings_only": true diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..23a9b4d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,46 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/pre-commit/pygrep-hooks + rev: v1.10.0 + hooks: + - id: python-check-mock-methods + - id: python-use-type-annotations + # Using this mirror lets us use mypyc-compiled black, which is about 2x faster + - repo: https://github.com/psf/black-pre-commit-mirror + rev: 24.4.2 + hooks: + - id: black + # It is recommended to specify the latest version of Python + # supported by your project here, or alternatively use + # pre-commit's default_language_version, see + # https://pre-commit.com/#top_level-default_language_version + language_version: python3.11 + - repo: https://github.com/pycqa/isort + rev: 5.13.2 + hooks: + - id: isort + - repo: https://github.com/pycqa/flake8 + rev: 7.0.0 + hooks: + - id: flake8 + additional_dependencies: + - flake8-bugbear==24.2.6 + - flake8-comprehensions + - flake8-simplify + - repo: https://github.com/tox-dev/pyproject-fmt + rev: "1.8.0" + hooks: + - id: pyproject-fmt + # https://pyproject-fmt.readthedocs.io/en/latest/#calculating-max-supported-python-version + additional_dependencies: ["tox>=4.9"] + - repo: https://github.com/igorshubovych/markdownlint-cli + rev: v0.40.0 + hooks: + - id: markdownlint + args: ["-c", ".markdownlint.yaml", "--fix"] +ci: + autoupdate_schedule: monthly diff --git a/Dockerfile b/Dockerfile index e12bfd3..e959cee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,33 +4,66 @@ ARG PYTHON_BASE_VER=slim-bookworm ARG PYTHON_VENV=/venv # -# ------ prepare virtual env ------ # +# ------ prepare venv ------ # # -FROM python:${PYTHON_VERSION}-${PYTHON_BASE_VER} as deps_installer -# install build base +FROM python:${PYTHON_VERSION}-${PYTHON_BASE_VER} as venv_builder + +ARG PYTHON_VENV + +COPY ./src ./pyproject.toml /source_code + +# ------ install build deps ------ # RUN set -eux; \ - apt-get update && \ + apt-get update ; \ apt-get install -y --no-install-recommends \ python3-dev \ + libcurl4-openssl-dev \ + libssl-dev \ gcc \ - git + git -# install otaclient deps -ARG PYTHON_VENV -ARG OTACLIENT_REQUIREMENTS -COPY "${OTACLIENT_REQUIREMENTS}" /tmp/requirements.txt +# ------ setup virtual env and build ------ # +RUN set -eux ; \ + python3 -m venv ${PYTHON_VENV} ; \ + . ${PYTHON_VENV}/bin/activate ; \ + export PYTHONDONTWRITEBYTECODE=1 ; \ + cd /source_code ;\ + python3 -m pip install -U pip ; \ + python3 -m pip install . -RUN set -eux; \ - python3 -m venv ${PYTHON_VENV} && \ - . ${PYTHON_VENV}/bin/activate && \ - export PYTHONDONTWRITEBYTECODE=1 && \ - python3 -m pip install --no-cache-dir -U pip setuptools wheel && \ - python3 -m pip install --no-cache-dir -r /tmp/requirements.txt && \ - -# cleanup the virtualenv +# ------ post installation, cleanup ------ # +# cleanup the python venv again # see python-slim Dockerfile for more details +RUN set -eux ; \ find ${PYTHON_VENV} -depth \ - \( \ - \( -type d -a \( -name test -o -name tests -o -name idle_test \) \) \ - -o \( -type f -a \( -name '*.pyc' -o -name '*.pyo' -o -name 'libpython*.a' \) \) \ - \) -exec rm -rf '{}' + \ No newline at end of file + \( \ + \( -type d -a \( -name test -o -name tests -o -name idle_test \) \) \ + -o \( -type f -a \( -name '*.pyc' -o -name '*.pyo' -o -name 'libpython*.a' \) \) \ + \) -exec rm -rf '{}' + + +# +# ------ build final image ------ # +# +FROM python:${PYTHON_VERSION}-${PYTHON_BASE_VER} + +ARG PYTHON_VENV +ARG CMD_NAME + +COPY --from=venv_builder ${PYTHON_VENV} ${PYTHON_VENV} + +# add missing libs +RUN set -eux ; \ + apt-get update ; \ + apt-get install -y --no-install-recommends \ + libcurl4 ; \ + rm -rf \ + /var/lib/apt/lists/* \ + /root/.cache \ + /tmp/* + +# add mount points placeholder +RUN mkdir -p /opt /greengrass + +ENV PATH="${PYTHON_VENV}/bin:${PATH}" + +CMD ["iot_logging_server"] diff --git a/README.md b/README.md index 725b86c..c20f9f5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,45 @@ # OTAClient AWS IoT logging server -A logging server that uploads logs sent from otaclient to AWS cloudwatch. \ No newline at end of file +A logging server that uploads logs sent from otaclient to AWS cloudwatch. + +This iot-logger is expected to be installed on the main ECU, with greengrass certificates and otaclient config file(ecu_info.yaml) installed. + +## TPM support + +If greengrass is configured to use TPM with pkcs11(priv-key sealed by TPM, with or without cert also stored in tpm-pkcs11 database), iot-logger will automatically enable TPM support when parsing the greengrass configuration file. + +## Filter uploaded logs + +If `ecu_info.yaml` presented and valid, iot-logger will only accept logs from known ECU ids. +The known ECU ids are retrieved from parsing `ecu_info.secondaries` field. +Currently only ECU id will be checked, IP checking is not performed as sub ECU otaclient might send logging from different IPs if ECU has multiple interfaces. + +NOTE that if `ecu_info.yaml` file is not presented, the filtering will be DISABLED. + +## Auto restart on config files changed + +By default, the `EXIT_ON_CONFIG_FILE_CHANGED` is enabled. +Together with systemd.service `Restart` policy configured, automatically restart iot-logger server on config files changed can be achieved. + +## Usage + +### Environmental variables + +The behaviors of the iot_logging_server can be configured with the following environmental variables: + +| Environmental variables | Default value | Description | +| ---- | ---- | --- | +| GREENGRASS_V1_CONFIG | `/greengrass/config/config.json` | | +| GREENGRASS_V2_CONFIG | `/greengrass/v2/init_config/config.yaml` | If both v1 and v2 config file exist, v2 will be used in prior. | +| AWS_PROFILE_INFO | `/opt/ota/iot_logger/aws_profile_info.yaml` | The location of AWS profile info mapping files. | +| ECU_INFO_YAML | `/boot/ota/ecu_info.yaml` | The location of ecu_info.yaml config file. iot-logger server will parse the config file and only process logs sending from known ECUs.| +| LISTEN_ADDRESS | `127.0.0.1` | The IP address iot-logger server listen on. By default only receive logs from local machine. | +| LISTEN_PORT | `8083` | | +| UPLOAD_LOGGING_SERVER_LOGS | `false` | Whether to upload the logs from server itself to cloudwatchlogs. | +| SERVER_LOGSTREAM_SUFFIX | `iot_logging_server` | log_stream suffix for local server logs on cloudwatchlogs if uploaded. | +| SERVER_LOGGING_LEVEL | `INFO` | The logging level of the server itself. | +| SERVER_LOGGING_LOG_FORMAT | `[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s` | | +| MAX_LOGS_BACKLOG | `4096` | Max pending log entries. | +| MAX_LOGS_PER_MERGE | `512` | Max log entries in a merge group. | +| UPLOAD_INTERVAL | `3` | Interval of uploading log batches to cloud. **Note that if the logger is restarted before next upload occurs, the pending loggings will be dropped.** | +| EXIT_ON_CONFIG_FILE_CHANGED | `true` | Whether to kill the server on config files changed. **Note that this feature is expected to be used together with systemd.service Restart.** | diff --git a/examples/aws_profile_info.yaml b/examples/aws_profile_info.yaml new file mode 100644 index 0000000..16a374e --- /dev/null +++ b/examples/aws_profile_info.yaml @@ -0,0 +1,9 @@ +- profile_name: "profile-dev" + account_id: "012345678901" + credential_endpoint: "abcdefghijk01.credentials.iot.region.amazonaws.com" +- profile_name: "profile-stg" + account_id: "012345678902" + credential_endpoint: "abcdefghijk02.credentials.iot.region.amazonaws.com" +- profile_name: "profile-prd" + account_id: "012345678903" + credential_endpoint: "abcdefghijk03.credentials.iot.region.amazonaws.com" diff --git a/examples/otaclient-logger.service b/examples/otaclient-logger.service new file mode 100644 index 0000000..294ca9b --- /dev/null +++ b/examples/otaclient-logger.service @@ -0,0 +1,17 @@ +[Unit] +Description=OTAClient AWS Iot logging server +Wants=network-online.target +After=network-online.target nss-lookup.target + +[Service] +ExecStart=/opt/ota/iot_logger/venv/bin/iot_logging_server +Environment=LISTEN_ADDRESS=127.0.0.1 +Environment=LISTEN_ADDRESS=8083 +Environment=UPLOAD_LOGGING_SERVER_LOGS=true +Environment=SERVER_LOGGING_LEVEL=INFO +Restart=on-failure +RestartSec=10 +Type=simple + +[Install] +WantedBy=multi-user.target diff --git a/otaclient_iot_logging_server/__main__.py b/otaclient_iot_logging_server/__main__.py deleted file mode 100644 index cbc23f0..0000000 --- a/otaclient_iot_logging_server/__main__.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from __future__ import annotations - -import logging - -from otaclient_iot_logging_server import __version__ -from otaclient_iot_logging_server import package_name as root_package_name -from otaclient_iot_logging_server.configs import server_cfg -from otaclient_iot_logging_server.greengrass_config import parse_config -from otaclient_iot_logging_server.log_proxy_server import launch_server - - -def main(): - # ------ configure the root logger ------ # - # NOTE: for the root logger, set to CRITICAL to filter away logs from other - # external modules unless reached CRITICAL level. - logging.basicConfig( - level=logging.CRITICAL, format=server_cfg.SERVER_LOGGING_LOG_FORMAT, force=True - ) - # NOTE: set the to the package root logger - root_logger = logging.getLogger(root_package_name) - root_logger.setLevel(server_cfg.SERVER_LOGGING_LEVEL) - # ------ launch server ------ # - launch_server( - parse_config(), - max_logs_backlog=server_cfg.MAX_LOGS_BACKLOG, - max_logs_per_merge=server_cfg.MAX_LOGS_PER_MERGE, - interval=server_cfg.UPLOAD_INTERVAL, - ) - - root_logger.info( - f"logger server({__version__}) is launched at http://{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT}" - ) - - -if __name__ == "__main__": - main() diff --git a/otaclient_iot_logging_server/aws_iot_logger.py b/otaclient_iot_logging_server/aws_iot_logger.py deleted file mode 100644 index e64af77..0000000 --- a/otaclient_iot_logging_server/aws_iot_logger.py +++ /dev/null @@ -1,194 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from __future__ import annotations - -import logging -import time -from datetime import datetime -from queue import Empty, Queue -from threading import Thread - -from typing_extensions import NoReturn - -from otaclient_iot_logging_server._common import LogMessage -from otaclient_iot_logging_server._utils import chain_query, retry -from otaclient_iot_logging_server.boto3_session import Boto3Session -from otaclient_iot_logging_server.greengrass_config import IoTSessionConfig - -logger = logging.getLogger(__name__) - - -def get_log_stream_name(thing_name: str, log_stream_sufix: str) -> str: - """Compose LogStream name. - - Schema: YYYY/MM/DD/ - """ - fmt = "{strftime:%Y/%m/%d}".format(strftime=datetime.utcnow()) - return f"{fmt}/{thing_name}/{log_stream_sufix}" - - -class AWSIoTLogger: - """ - - Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html - """ - - # this upper bound is defined by boto3, check doc for more details. - MAX_LOGS_PER_PUT = 10_000 - - def __init__( - self, - session_config: IoTSessionConfig, - queue: Queue[tuple[str, LogMessage]], - max_logs_per_merge: int, - interval: int, - ): - _boto3_session = Boto3Session(session_config) - self._client = _client = _boto3_session.get_session().client( - service_name="logs" - ) - self._session_config = session_config - self._exception = _client.exceptions - self._sequence_tokens = {} - self._interval = interval - self._queue: Queue[tuple[str, LogMessage]] = queue - self._max_logs_per_merge = max_logs_per_merge - # unconditionally create log_group and log_stream, do nothing if existed. - self._create_log_group(log_group_name=session_config.aws_cloudwatch_log_group) - - @retry(max_retry=16, backoff_factor=2, backoff_max=32) - def _create_log_group(self, log_group_name: str): - try: - self._client.create_log_group(logGroupName=log_group_name) - logger.info(f"{log_group_name=} has been created") - except self._exception.ResourceAlreadyExistsException as e: - logger.debug( - f"{log_group_name=} already existed, skip creating: {e.response}" - ) - except Exception as e: - logger.error(f"failed to create {log_group_name=}: {e!r}") - raise - - @retry(max_retry=16, backoff_factor=2, backoff_max=32) - def _create_log_stream(self, log_group_name: str, log_stream_name: str): - try: - self._client.create_log_stream( - logGroupName=log_group_name, - logStreamName=log_stream_name, - ) - logger.info(f"{log_stream_name=}@{log_group_name} has been created") - self._sequence_tokens = {} # clear sequence token on new stream created - except self._exception.ResourceAlreadyExistsException: - logger.debug( - f"{log_stream_name=}@{log_group_name} already existed, skip creating" - ) - except Exception as e: - logger.error(f"failed to create {log_stream_name=}@{log_group_name}: {e!r}") - raise - - @retry(backoff_factor=2) - def send_messages(self, log_stream_suffix: str, message_list: list[LogMessage]): - """ - Ref: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html - """ - session_config, client = self._session_config, self._client - exceptions = client.exceptions - log_stream_name = get_log_stream_name( - session_config.thing_name, log_stream_suffix - ) - log_group_name = session_config.aws_cloudwatch_log_group - - request = { - "logGroupName": log_group_name, - "logStreamName": log_stream_name, - "logEvents": message_list, - } - if _seq_token := self._sequence_tokens.get(log_stream_name): - request["sequenceToken"] = _seq_token - # check message_list length - if len(message_list) > self.MAX_LOGS_PER_PUT: - logger.warning( - f"too much logs in a single put, ignore exceeded logs: {self.MAX_LOGS_PER_PUT=}" - ) - message_list = message_list[: self.MAX_LOGS_PER_PUT] - - try: - response = client.put_log_events(**request) - # NOTE: the sequenceToken is deprecated, put_log_events will always - # be accepted with/without a sequenceToken. - # see docs for more details. - if _sequence_token := response.get("nextSequenceToken"): - self._sequence_tokens[log_stream_name] = _sequence_token - except ( - exceptions.DataAlreadyAcceptedException, - exceptions.InvalidSequenceTokenException, - ) as e: - response = e.response - logger.debug(f"{response}: {e!r}") - - _resp_err_msg: str = chain_query(e.response, "Error", "Message", default="") - # null as the next sequenceToken means don't include any - # sequenceToken at all, not that the token should be set to "null" - next_expected_token = _resp_err_msg.rsplit(" ", 1)[-1] - if next_expected_token == "null": - self._sequence_tokens.pop(log_stream_name, None) - else: - self._sequence_tokens[log_stream_name] = next_expected_token - except client.exceptions.ResourceNotFoundException as e: - response = e.response - logger.info(f"{log_stream_name=} not found: {e!r}") - self._create_log_stream( - log_group_name=log_group_name, log_stream_name=log_stream_name - ) - except Exception as e: - logger.error( - f"put_log_events failure: {e!r}\n" - f"log_group_name={session_config.aws_cloudwatch_log_group}, \n" - f"log_stream_name={log_stream_name}" - ) - raise - - def thread_main(self) -> NoReturn: - """Main entry for running this iot_logger in a thread.""" - while True: - # merge message - message_dict: dict[str, list[LogMessage]] = {} - - _merge_count = 0 - while _merge_count < self._max_logs_per_merge: - _queue = self._queue - try: - log_stream_suffix, message = _queue.get_nowait() - _merge_count += 1 - - if log_stream_suffix not in message_dict: - message_dict[log_stream_suffix] = [] - message_dict[log_stream_suffix].append(message) - except Empty: - break - - for log_stream_suffix, logs in message_dict.items(): - self.send_messages(log_stream_suffix, logs) - - time.sleep(self._interval) - - -def start_sending_msg_thread(iot_logger: AWSIoTLogger) -> Thread: - _thread = Thread(target=iot_logger.thread_main, daemon=True) - _thread.start() - logger.debug("iot logger started") - return _thread diff --git a/otaclient_iot_logging_server/boto3_session.py b/otaclient_iot_logging_server/boto3_session.py deleted file mode 100644 index af0e718..0000000 --- a/otaclient_iot_logging_server/boto3_session.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from __future__ import annotations - -import json -import logging - -import pycurl -from boto3 import Session -from botocore.credentials import DeferredRefreshableCredentials -from botocore.session import get_session as get_botocore_session - -from otaclient_iot_logging_server._common import Credentials -from otaclient_iot_logging_server.greengrass_config import IoTSessionConfig - -logger = logging.getLogger(__name__) - - -class Boto3Session: - """A refreshable boto3 session with pkcs11. - - Reference: - https://github.com/awslabs/aws-iot-core-credential-provider-session-helper/blob/main/src/awsiot_credentialhelper/boto3_session.py - """ - - def __init__(self, config: IoTSessionConfig) -> None: - self._config = config - - def get_session(self, **kwargs) -> Session: - session = get_botocore_session() - # NOTE: session does have an attribute named _credentials - session._credentials = DeferredRefreshableCredentials( # type: ignore - method="sts-assume-role", - refresh_using=self._get_credentials, - ) - session.set_config_variable("region", self._config.region) - - # set other configs if any - for k, v in kwargs.items(): - session.set_config_variable(k, v) - return Session(botocore_session=session) - - def _get_credentials(self) -> Credentials: - """Get credentials using mtls from credential_endpoint.""" - gg_config = self._config - connection = pycurl.Curl() - connection.setopt(pycurl.URL, gg_config.aws_credential_refresh_url) - - # ------ client auth option ------ # - # TPM2.0 support, if private_key is provided as pkcs11 URI, - # enable to use pkcs11 interface from openssl. - _enable_pkcs11_engine = False - if gg_config.private_key_path.startswith("pkcs11:"): - _enable_pkcs11_engine = True - connection.setopt(pycurl.SSLKEYTYPE, "eng") - connection.setopt(pycurl.SSLKEY, gg_config.private_key_path) - - if gg_config.certificate_path.startswith("pkcs11:"): - _enable_pkcs11_engine = True - connection.setopt(pycurl.SSLCERTTYPE, "eng") - connection.setopt(pycurl.SSLCERT, gg_config.certificate_path) - - if _enable_pkcs11_engine: - connection.setopt(pycurl.SSLENGINE, "pkcs11") - - # ------ server auth option ------ # - connection.setopt(pycurl.SSL_VERIFYPEER, 1) - connection.setopt(pycurl.CAINFO, gg_config.ca_path) - connection.setopt(pycurl.CAPATH, None) - connection.setopt(pycurl.SSL_VERIFYHOST, 2) - - # ------ set required header ------ # - headers = [f"x-amzn-iot-thingname:{gg_config.thing_name}"] - connection.setopt(pycurl.HTTPHEADER, headers) - - # ------ execute the request and parse creds ------ # - response = connection.perform_rs() - status = connection.getinfo(pycurl.HTTP_CODE) - connection.close() - - if status // 100 != 2: - _err_msg = f"failed to get cred: {status=}" - logger.debug(_err_msg) - raise ValueError(_err_msg) - - try: - response_json = json.loads(response) - assert isinstance(response_json, dict), "response is not a json object" - except Exception as e: - _err_msg = f"cred response is invalid: {e!r}\nresponse={response}" - logger.debug(_err_msg) - raise ValueError(_err_msg) - - try: - _creds = response_json["credentials"] - creds = Credentials( - access_key=_creds["accessKeyId"], - secret_key=_creds["secretAccessKey"], - token=_creds["sessionToken"], - expiry_time=_creds["expiration"], - ) - logger.debug(f"loaded credential={creds}") - return creds - except Exception as e: - _err_msg = f"failed to create Credentials object from response: {e!r}\nresponse_json={response_json}" - logger.debug(_err_msg) - raise ValueError(_err_msg) diff --git a/pyproject.toml b/pyproject.toml index 6450a79..5ab2d7f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,16 @@ [build-system] -requires = ["hatchling>=1.20.0", "hatch-vcs"] build-backend = "hatchling.build" +requires = [ + "hatch-vcs", + "hatchling>=1.20", +] [project] -name = "otaclient_iot_logging_server" +name = "otaclient-iot-logging-server" +description = "A logging server that uploads logs sent from otaclient to AWS cloudwatch." readme = "README.md" -requires-python = ">=3.8" license = { text = "LICENSE.md" } +requires-python = ">=3.8" classifiers = [ "License :: OSI Approved :: Apache Software License", "Operating System :: Unix", @@ -15,81 +19,58 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dynamic = [ + "version", ] dependencies = [ - "aiohttp>=3.9.2, <3.10.0", - "boto3==1.34.35", - "botocore==1.34.35", - "pydantic==2.6.0", - "pydantic-settings==2.1.0", - "pycurl==7.45.1", + "aiohttp<3.10.0,>=3.9.5", + "awsiot_credentialhelper<0.7.0,>=0.6", + "boto3<1.35.0,>=1.34.35", + "botocore<1.35.0,==1.34.35", + "pydantic==2.7", + "pydantic-settings==2.2.1", + "pyopenssl<25.0.0,>=24.1", "pyyaml==6.0.1", - "typing_extensions>=4.0", + "typing_extensions>=4", ] -dynamic = ["version"] -description = "A logging server that uploads logs sent from otaclient to AWS cloudwatch." - [project.optional-dependencies] dev = [ "black==24.1.1", "coverage==7.4.1", - "flake8==6.1.0", + "flake8==6.1", "isort==5.13.2", "pytest==7.4.4", "pytest-asyncio==0.23.4", - "pytest-mock==3.12.0", - + "pytest-env==1.1.3", + "pytest-mock==3.12", ] - -[project.scripts] -iot_logging_server = "otaclient_iot_logging_server.__main__:main" - [project.urls] +Homepage = "https://github.com/tier4/otaclient-iot-logging-server" Source = "https://github.com/tier4/otaclient-iot-logging-server" - -[tool.black] -line-length = 88 -target-version = ['py311'] - -[tool.coverage.run] -branch = false -include = ["otaclient_iot_logging_server/**/*.py"] - -[tool.coverage.report] -exclude_also = [ - "def __repr__", - "if cfg.DEBUG_MODE", - "if __name__ == .__main__.:", - "if TYPE_CHECKING:", - "class .*\\bProtocol\\):", - "@(abc\\.)?abstractmethod", -] -show_missing = true -skip_covered = true -skip_empty = true +[project.scripts] +iot_logging_server = "otaclient_iot_logging_server.__main__:main" [tool.hatch.envs.dev] +type = "virtual" features = ["dev"] -[tool.hatch.envs.dev.env-vars] -AWS_PROFILE_INFO = "tests/data/aws_profile_info.yaml" -GREENGRASS_V1_CONFIG = "tests/data/gg_v1_cfg.json" -GREENGRASS_V2_CONFIG = "tests/data/gg_v2_cfg.yaml" -SERVER_LOGGING_LEVEL = "10" # debug -UPLOAD_INTERVAL = "6" - [tool.hatch.version] source = "vcs" [tool.hatch.build.hooks.vcs] -version-file = "otaclient_iot_logging_server/_version.py" +version-file = "src/otaclient_iot_logging_server/_version.py" [tool.hatch.build.targets.sdist] exclude = ["/.github", "/docs"] [tool.hatch.build.targets.wheel] -only-include = ["otaclient_iot_logging_server"] -sources = ["otaclient_iot_logging_server"] +only-include = ["src"] +sources = ["src"] + +[tool.black] +line-length = 88 [tool.isort] atomic = true @@ -99,15 +80,32 @@ lines_before_imports = 2 skip_gitignore = true known_first_party = ["otaclient_iot_logging_server"] -[tool.pyright] -exclude = ["**/__pycache__"] -pythonVersion = "3.11" - [tool.pytest.ini_options] +env = [ + "AWS_PROFILE_INFO=tests/data/aws_profile_info.yaml", + "GREENGRASS_V1_CONFIG=tests/data/gg_v1_cfg.json", + "GREENGRASS_V2_CONFIG=tests/data/gg_v2_cfg.yaml", +] asyncio_mode = "auto" log_auto_indent = true log_format = "%(asctime)s %(levelname)s %(filename)s %(funcName)s,%(lineno)d %(message)s" log_cli = true log_cli_level = "INFO" -pythonpath = ["otaclient_iot_logging_server"] -testpaths = ["./tests"] +pythonpath = ["src"] +testpaths = ["tests"] + +[tool.coverage.run] +branch = true +source = ["otaclient_iot_logging_server"] +relative_files = true + +[tool.coverage.report] +exclude_also = [ + "def __repr__", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", + "class .*\\bProtocol\\):", + "@(abc\\.)?abstractmethod", +] +show_missing = true +skip_empty = true diff --git a/sonar-project.properties b/sonar-project.properties new file mode 100644 index 0000000..943211c --- /dev/null +++ b/sonar-project.properties @@ -0,0 +1,7 @@ +sonar.organization=tier4 +sonar.projectKey=tier4_otaclient-iot-logging-server +sonar.python.coverage.reportPaths=test_result/coverage.xml +sonar.sources=./src +sonar.tests=tests +sonar.sourceEncoding=UTF-8 +sonar.python.version=3.8,3.9,3.10,3.11 diff --git a/src/otaclient_iot_logging_server/.gitignore b/src/otaclient_iot_logging_server/.gitignore new file mode 100644 index 0000000..6a8d699 --- /dev/null +++ b/src/otaclient_iot_logging_server/.gitignore @@ -0,0 +1,2 @@ +# ignore generated version file +_version.py diff --git a/otaclient_iot_logging_server/__init__.py b/src/otaclient_iot_logging_server/__init__.py similarity index 88% rename from otaclient_iot_logging_server/__init__.py rename to src/otaclient_iot_logging_server/__init__.py index 665c449..40bfe09 100644 --- a/otaclient_iot_logging_server/__init__.py +++ b/src/otaclient_iot_logging_server/__init__.py @@ -11,9 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# flake8: noqa -from _version import version as __version__ # type: ignore +from otaclient_iot_logging_server._version import __version__ package_name = __name__.split(".")[0] +version = __version__ diff --git a/src/otaclient_iot_logging_server/__main__.py b/src/otaclient_iot_logging_server/__main__.py new file mode 100644 index 0000000..0e4e49b --- /dev/null +++ b/src/otaclient_iot_logging_server/__main__.py @@ -0,0 +1,55 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from queue import Queue + +from otaclient_iot_logging_server import __version__ +from otaclient_iot_logging_server._common import LogsQueue +from otaclient_iot_logging_server._log_setting import config_logging +from otaclient_iot_logging_server.aws_iot_logger import start_aws_iot_logger_thread +from otaclient_iot_logging_server.config_file_monitor import config_file_monitor_thread +from otaclient_iot_logging_server.configs import server_cfg +from otaclient_iot_logging_server.log_proxy_server import launch_server + + +def main() -> None: + # server scope log entries pipe + queue: LogsQueue = Queue(maxsize=server_cfg.MAX_LOGS_BACKLOG) + # ------ configure local logging ------ # + root_logger = config_logging( + queue, + format=server_cfg.SERVER_LOGGING_LOG_FORMAT, + level=server_cfg.SERVER_LOGGING_LEVEL, + enable_server_log=server_cfg.UPLOAD_LOGGING_SERVER_LOGS, + server_logstream_suffix=server_cfg.SERVER_LOGSTREAM_SUFFIX, + ) + + root_logger.info( + f"launching iot_logging_server({__version__}) at http://{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT}" + ) + root_logger.info(f"iot_logging_server config: \n{server_cfg}") + # ------ launch aws cloudwatch client ------ # + start_aws_iot_logger_thread(queue) + # ------ launch config file monitor ------ # + if server_cfg.EXIT_ON_CONFIG_FILE_CHANGED: + config_file_monitor_thread() + # ------ start server ------ # + launch_server(queue=queue) # NoReturn + + +if __name__ == "__main__": + main() diff --git a/otaclient_iot_logging_server/_common.py b/src/otaclient_iot_logging_server/_common.py similarity index 57% rename from otaclient_iot_logging_server/_common.py rename to src/otaclient_iot_logging_server/_common.py index 47bd6af..438e418 100644 --- a/otaclient_iot_logging_server/_common.py +++ b/src/otaclient_iot_logging_server/_common.py @@ -15,9 +15,12 @@ from __future__ import annotations -from typing import TypedDict +from queue import Queue +from typing import Literal, TypedDict -from typing_extensions import NotRequired +from typing_extensions import NotRequired, TypeAlias + +LogsQueue: TypeAlias = "Queue[tuple[str, LogMessage]]" class LogMessage(TypedDict): @@ -32,8 +35,18 @@ class LogEvent(TypedDict): sequenceToken: NotRequired[str] -class Credentials(TypedDict): - access_key: str - secret_key: str - token: str - expiry_time: str +PKCS11URI = TypedDict( + "PKCS11URI", + { + "object": str, + "pin-value": NotRequired[str], + "token": NotRequired[str], + "type": NotRequired[Literal["cert", "private"]], + }, +) +""" +NOTE: Not all possible segments are defined here. + see https://www.rfc-editor.org/rfc/rfc7512.html for more details. + In normal case, (priv_key_label) is enough, as long as there is + only one private key inside the slot. +""" diff --git a/src/otaclient_iot_logging_server/_log_setting.py b/src/otaclient_iot_logging_server/_log_setting.py new file mode 100644 index 0000000..e18c19f --- /dev/null +++ b/src/otaclient_iot_logging_server/_log_setting.py @@ -0,0 +1,80 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import contextlib +import logging +import time +from queue import Queue + +from otaclient_iot_logging_server import package_name as root_package_name +from otaclient_iot_logging_server._common import LogMessage +from otaclient_iot_logging_server.configs import server_cfg + + +class _LogTeeHandler(logging.Handler): + """Implementation of uploading local server loggings to cloudwatch.""" + + def __init__( + self, + queue: Queue[tuple[str, LogMessage]], + logstream_suffix: str, + ) -> None: + super().__init__() + self._queue = queue + self._logstream_suffix = logstream_suffix + + def emit(self, record: logging.LogRecord) -> None: + with contextlib.suppress(Exception): + self._queue.put_nowait( + ( + self._logstream_suffix, + LogMessage( + timestamp=int(time.time()) * 1000, # milliseconds + message=self.format(record), + ), + ) + ) + + +def config_logging( + queue: Queue[tuple[str, LogMessage]], + *, + format: str, + level: str, + enable_server_log: bool, + server_logstream_suffix: str, +): + # NOTE: for the root logger, set to CRITICAL to filter away logs from other + # external modules unless reached CRITICAL level. + logging.basicConfig(level=logging.CRITICAL, format=format, force=True) + # NOTE: set the to the package root logger + root_logger = logging.getLogger(root_package_name) + root_logger.setLevel(level) + + if enable_server_log and server_logstream_suffix: + _tee_handler = _LogTeeHandler( + queue=queue, + logstream_suffix=server_logstream_suffix, + ) + _fmt = logging.Formatter(fmt=server_cfg.SERVER_LOGGING_LOG_FORMAT) + _tee_handler.setFormatter(_fmt) + + # attach the log tee handler to the root logger + root_logger.addHandler(_tee_handler) + root_logger.info(f"enable server logs upload with {server_logstream_suffix=}") + + return root_logger diff --git a/otaclient_iot_logging_server/_utils.py b/src/otaclient_iot_logging_server/_utils.py similarity index 76% rename from otaclient_iot_logging_server/_utils.py rename to src/otaclient_iot_logging_server/_utils.py index 3c68742..8366730 100644 --- a/otaclient_iot_logging_server/_utils.py +++ b/src/otaclient_iot_logging_server/_utils.py @@ -17,11 +17,13 @@ import time from functools import partial, wraps -from typing import Any, Callable, Optional, TypeVar +from typing import Any, Callable, Optional, TypeVar, overload from pydantic import BaseModel, ConfigDict from typing_extensions import ParamSpec, TypeAlias +from otaclient_iot_logging_server._common import PKCS11URI + RT = TypeVar("RT") P = ParamSpec("P") NestedDict: TypeAlias = "dict[str, Any | 'NestedDict']" @@ -34,7 +36,7 @@ class FixedConfig(BaseModel): _MISSING = object() -def chain_query(_obj: NestedDict, *_paths: str, default=_MISSING) -> Any: +def chain_query(_obj: NestedDict, *_paths: str, default: object = _MISSING) -> Any: """Chain access a nested dict <_obj> according to search <_paths>. For example: @@ -71,6 +73,28 @@ def chain_query(_obj: NestedDict, *_paths: str, default=_MISSING) -> Any: raise ValueError(f"chain query with {_paths=} failed: {e!r}") from e +@overload +def retry( + func: None = None, + /, + backoff_factor: float = 0.1, + backoff_max: int = 6, + max_retry: int = 6, + retry_on_exceptions: tuple[type[Exception], ...] = (Exception,), +) -> partial[Any]: ... + + +@overload +def retry( + func: Callable[P, RT], + /, + backoff_factor: float = ..., + backoff_max: int = ..., + max_retry: int = ..., + retry_on_exceptions: tuple[type[Exception], ...] = ..., +) -> Callable[P, RT]: ... + + def retry( func: Optional[Callable[P, RT]] = None, /, @@ -111,3 +135,12 @@ def remove_prefix(_str: str, _prefix: str) -> str: if _str.startswith(_prefix): return _str.replace(_prefix, "", 1) return _str + + +def parse_pkcs11_uri(_pkcs11_uri: str) -> PKCS11URI: + _, pkcs11_opts_str = _pkcs11_uri.split(":", maxsplit=1) + pkcs11_opts_dict: dict[str, Any] = {} + for opt in pkcs11_opts_str.split(";"): + k, v = opt.split("=", maxsplit=1) + pkcs11_opts_dict[k] = v + return PKCS11URI(**pkcs11_opts_dict) diff --git a/src/otaclient_iot_logging_server/aws_iot_logger.py b/src/otaclient_iot_logging_server/aws_iot_logger.py new file mode 100644 index 0000000..b017855 --- /dev/null +++ b/src/otaclient_iot_logging_server/aws_iot_logger.py @@ -0,0 +1,207 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import contextlib +import logging +import time +from collections import defaultdict +from datetime import datetime, timezone +from queue import Empty +from threading import Thread + +import awscrt.exceptions +from typing_extensions import NoReturn + +from otaclient_iot_logging_server._common import LogEvent, LogMessage, LogsQueue +from otaclient_iot_logging_server._utils import retry +from otaclient_iot_logging_server.boto3_session import get_session +from otaclient_iot_logging_server.configs import server_cfg +from otaclient_iot_logging_server.greengrass_config import ( + IoTSessionConfig, + parse_config, +) + +logger = logging.getLogger(__name__) + + +def get_log_stream_name(thing_name: str, log_stream_sufix: str) -> str: + """Compose LogStream name. + + Schema: YYYY/MM/DD// + """ + fmt = "{strftime:%Y/%m/%d}".format(strftime=datetime.now(timezone.utc)) + return f"{fmt}/{thing_name}/{log_stream_sufix}" + + +class AWSIoTLogger: + """ + Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html + """ + + # this upper bound is defined by boto3, check doc for more details. + MAX_LOGS_PER_PUT = 10_000 + + def __init__( + self, + session_config: IoTSessionConfig, + queue: LogsQueue, + max_logs_per_merge: int, + interval: int, + ): + _boto3_session = get_session(session_config) + self._client = client = _boto3_session.client(service_name="logs") + self._exc_types = client.exceptions + + self._session_config = session_config + self._log_group_name = session_config.aws_cloudwatch_log_group + self._interval = interval + self._queue: LogsQueue = queue + # NOTE: add this limitation to ensure all of the log_streams in a merge + # will definitely have entries less than MAX_LOGS_PER_PUT + self._max_logs_per_merge = min(max_logs_per_merge, self.MAX_LOGS_PER_PUT) + + @retry(max_retry=16, backoff_factor=2, backoff_max=32) + def _create_log_group(self): + # TODO: (20240214) should we let the edge side iot_logging_server + # create the log group? + log_group_name, client = self._log_group_name, self._client + exc_types = self._exc_types + try: + client.create_log_group(logGroupName=log_group_name) + logger.info(f"{log_group_name=} has been created") + except exc_types.ResourceAlreadyExistsException as e: + logger.debug( + f"{log_group_name=} already existed, skip creating: {e.response}" + ) + except ValueError as e: + if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError): + logger.error( + (f"failed to create mtls connection to remote: {e.__cause__}") + ) + raise e.__cause__ + logger.error(f"failed to create {log_group_name=}: {e!r}") + raise + except Exception as e: + logger.error(f"failed to create {log_group_name=}: {e!r}") + raise + + @retry(max_retry=16, backoff_factor=2, backoff_max=32) + def _create_log_stream(self, log_stream_name: str): + log_group_name, client = self._log_group_name, self._client + exc_types = self._exc_types + try: + client.create_log_stream( + logGroupName=log_group_name, + logStreamName=log_stream_name, + ) + logger.info(f"{log_stream_name=}@{log_group_name} has been created") + except exc_types.ResourceAlreadyExistsException as e: + logger.debug( + f"{log_stream_name=}@{log_group_name} already existed, skip creating: {e.response}" + ) + except ValueError as e: + if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError): + logger.error( + (f"failed to create mtls connection to remote: {e.__cause__}") + ) + raise e.__cause__ + logger.error(f"failed to create {log_stream_name=}@{log_group_name}: {e!r}") + raise + except Exception as e: + logger.error(f"failed to create {log_stream_name=}@{log_group_name}: {e!r}") + raise + + @retry(backoff_factor=2) + def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]): + """ + Ref: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html + + NOTE: sequence token is not needed and ignored by PutLogEvents action now. See the documentation for more details. + NOTE: The sequenceToken parameter is now ignored in PutLogEvents actions. PutLogEvents actions are now accepted + and never return InvalidSequenceTokenException or DataAlreadyAcceptedException even if the sequence token is not valid. + See the documentation for more details. + """ + request = LogEvent( + logGroupName=self._log_group_name, + logStreamName=log_stream_name, + logEvents=message_list, + ) + + exc_types, client = self._exc_types, self._client + try: + client.put_log_events(**request) + # logger.debug(f"successfully uploaded: {response}") + except exc_types.ResourceNotFoundException as e: + logger.debug(f"{log_stream_name=} not found: {e!r}") + self._create_log_stream(log_stream_name) + raise + except Exception as e: + # NOTE: for unhandled exception, we just log it and ignore, + # leave for the developer to properly handle it + # in the future! + logger.error( + f"put_log_events failure: {e!r}\n" + f"log_group_name={self._log_group_name}, \n" + f"log_stream_name={log_stream_name}" + ) + + def thread_main(self) -> NoReturn: + """Main entry for running this iot_logger in a thread.""" + # unconditionally create log_group and log_stream, do nothing if existed. + self._create_log_group() + + while True: + # merge LogMessages into the same source, identified by + # log_stream_suffix. + message_dict: dict[str, list[LogMessage]] = defaultdict(list) + + _merge_count = 0 + while _merge_count < self._max_logs_per_merge: + _queue = self._queue + try: + log_stream_suffix, message = _queue.get_nowait() + _merge_count += 1 + + message_dict[log_stream_suffix].append(message) + except Empty: + break + + for log_stream_suffix, logs in message_dict.items(): + with contextlib.suppress(Exception): + self.put_log_events( + get_log_stream_name( + self._session_config.thing_name, log_stream_suffix + ), + logs, + ) + # don't let the exception breaks the main loop + time.sleep(self._interval) + + +def start_aws_iot_logger_thread(queue: LogsQueue) -> Thread: + iot_logger = AWSIoTLogger( + session_config=parse_config(), + queue=queue, + max_logs_per_merge=server_cfg.MAX_LOGS_PER_MERGE, + interval=server_cfg.UPLOAD_INTERVAL, + ) + + _thread = Thread(target=iot_logger.thread_main, daemon=True) + _thread.start() + logger.debug("iot logger thread started") + return _thread diff --git a/src/otaclient_iot_logging_server/boto3_session.py b/src/otaclient_iot_logging_server/boto3_session.py new file mode 100644 index 0000000..4abd7d7 --- /dev/null +++ b/src/otaclient_iot_logging_server/boto3_session.py @@ -0,0 +1,144 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import subprocess +from pathlib import Path +from typing import Optional + +from awsiot_credentialhelper.boto3_session import Boto3SessionProvider +from awsiot_credentialhelper.boto3_session import Pkcs11Config as aws_PKcs11Config +from boto3 import Session +from OpenSSL import crypto + +from otaclient_iot_logging_server._utils import parse_pkcs11_uri +from otaclient_iot_logging_server.greengrass_config import ( + IoTSessionConfig, + PKCS11Config, +) + +# +# ------ certificate loading helpers ------ # +# + + +def _load_pkcs11_cert( + pkcs11_lib: str, + slot_id: str, + private_key_label: str, + user_pin: Optional[str] = None, +) -> bytes: + """Load certificate from a pkcs11 interface(backed by a TPM2.0 chip). + + This function requires opensc and libtpm2-pkcs11-1 to be installed, + and a properly setup and working TPM2.0 chip. + """ + # fmt: off + _cmd = [ + "/usr/bin/pkcs11-tool", + "--module", pkcs11_lib, + "--type", "cert", + "--slot", slot_id, + "--label", private_key_label, + "--read-object", + ] + if user_pin: + _cmd.extend(["--pin", user_pin]) + # fmt: on + return subprocess.check_output(_cmd) + + +def _convert_to_pem(_data: bytes) -> bytes: + """Unconditionally convert input cert to PEM format.""" + if _data.startswith(b"-----BEGIN CERTIFICATE-----"): + return _data + return crypto.dump_certificate( + crypto.FILETYPE_PEM, + crypto.load_certificate(crypto.FILETYPE_ASN1, _data), + ) + + +def _load_certificate(cert_path: str, pkcs11_cfg: Optional[PKCS11Config]) -> bytes: + """ + NOTE: Boto3SessionProvider only takes PEM format cert. + """ + if cert_path.startswith("pkcs11"): + assert ( + pkcs11_cfg + ), "certificate is provided by pkcs11, but no pkcs11_cfg is not available" + + _parsed_cert_uri = parse_pkcs11_uri(cert_path) + # NOTE: the cert pull from pkcs11 interface is in DER format + return _convert_to_pem( + _load_pkcs11_cert( + pkcs11_lib=pkcs11_cfg.pkcs11_lib, + slot_id=pkcs11_cfg.slot_id, + private_key_label=_parsed_cert_uri["object"], + user_pin=pkcs11_cfg.user_pin, + ) + ) + return _convert_to_pem(Path(cert_path).read_bytes()) + + +# +# ------ session creating helpers ------ # +# + + +def _get_session(config: IoTSessionConfig) -> Session: + """Get a session that using plain privkey.""" + return Boto3SessionProvider( + endpoint=config.aws_credential_provider_endpoint, + role_alias=config.aws_role_alias, + certificate=_load_certificate(config.certificate_path, config.pkcs11_config), + private_key=config.private_key_path, + thing_name=config.thing_name, + ).get_session() # type: ignore + + +def _get_session_pkcs11(config: IoTSessionConfig) -> Session: + """Get a session backed by privkey provided by pkcs11.""" + assert ( + pkcs11_cfg := config.pkcs11_config + ), "privkey is provided by pkcs11, but pkcs11_config is not available" + + _parsed_key_uri = parse_pkcs11_uri(config.private_key_path) + return Boto3SessionProvider( + endpoint=config.aws_credential_provider_endpoint, + role_alias=config.aws_role_alias, + certificate=_load_certificate(config.certificate_path, config.pkcs11_config), + thing_name=config.thing_name, + pkcs11=aws_PKcs11Config( + pkcs11_lib=pkcs11_cfg.pkcs11_lib, + slot_id=int(pkcs11_cfg.slot_id), + user_pin=pkcs11_cfg.user_pin, + private_key_label=_parsed_key_uri.get("object"), + ), + ).get_session() # type: ignore + + +# API + + +def get_session(config: IoTSessionConfig) -> Session: + """Get a boto3 session with givin IoTSessionConfig. + + The behavior changes according to whether privkey is provided by + pkcs11 or by plain file, indicating with URI. + """ + if config.private_key_path.startswith("pkcs11"): + return _get_session_pkcs11(config) + return _get_session(config) diff --git a/src/otaclient_iot_logging_server/config_file_monitor.py b/src/otaclient_iot_logging_server/config_file_monitor.py new file mode 100644 index 0000000..62504b4 --- /dev/null +++ b/src/otaclient_iot_logging_server/config_file_monitor.py @@ -0,0 +1,85 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Monitor the used config files. + +Monitor the files listed in , kill the server +if any of the files are changed. + +This is expected to be used together with systemd.unit Restart policy +to achieve automatically restart on configuration files changed. +""" + + +from __future__ import annotations + +import logging +import os +import signal +import threading +import time +from os import stat_result +from pathlib import Path +from typing import NamedTuple, NoReturn + +logger = logging.getLogger(__name__) + +_CHECK_INTERVAL = 3 # second + +monitored_config_files: set[str] = set() +_monitored_files_stat: dict[str, _MCTime] = {} + + +class _MCTime(NamedTuple): + mtime: int + ctime: int + + def file_changed(self, new_mctime: _MCTime) -> bool: + # if create time is newer in , it means the file is recreated. + # if modified time is newer in , it means the file is modified. + return self.ctime < new_mctime.ctime or self.mtime < new_mctime.mtime + + @classmethod + def from_stat(cls, stat: stat_result) -> _MCTime: + return cls(int(stat.st_mtime), int(stat.st_ctime)) + + +def _config_file_monitor() -> NoReturn: + # initialize, record the original status + logger.info(f"start to monitor the changes of {monitored_config_files}") + while True: + for entry in monitored_config_files: + try: + f_stat = Path(entry).stat() + except Exception as e: + logger.debug(f"cannot query stat from {entry}, skip: {e!r}") + continue + + new_f_mctime = _MCTime.from_stat(f_stat) + if entry not in _monitored_files_stat: + _monitored_files_stat[entry] = new_f_mctime + continue + + f_mctime = _monitored_files_stat[entry] + if f_mctime.file_changed(new_f_mctime): + logger.warning(f"detect change on config file {entry}, exit") + # NOTE: sys.exit is not working in thread + os.kill(os.getpid(), signal.SIGINT) + + time.sleep(_CHECK_INTERVAL) + + +def config_file_monitor_thread() -> threading.Thread: + t = threading.Thread(target=_config_file_monitor, daemon=True) + t.start() + return t diff --git a/otaclient_iot_logging_server/configs.py b/src/otaclient_iot_logging_server/configs.py similarity index 62% rename from otaclient_iot_logging_server/configs.py rename to src/otaclient_iot_logging_server/configs.py index d9e3bdd..f2d6796 100644 --- a/otaclient_iot_logging_server/configs.py +++ b/src/otaclient_iot_logging_server/configs.py @@ -16,48 +16,59 @@ from __future__ import annotations -import logging from pathlib import Path +from typing import List, Literal import yaml -from pydantic import AnyHttpUrl, BaseModel +from pydantic import BaseModel, BeforeValidator, Field, RootModel from pydantic_settings import BaseSettings, SettingsConfigDict +from typing_extensions import Annotated + +from otaclient_iot_logging_server.config_file_monitor import monitored_config_files + +_LoggingLevelName = Literal["INFO", "DEBUG", "CRITICAL", "ERROR", "WARNING"] class ConfigurableLoggingServerConfig(BaseSettings): model_config = SettingsConfigDict(frozen=True, validate_default=True) - # the default location of greengrass configuration files. # NOTE(20240209): allow user to change this values with env vars, GREENGRASS_V1_CONFIG: str = "/greengrass/config/config.json" GREENGRASS_V2_CONFIG: str = "/greengrass/v2/init_config/config.yaml" - AWS_PROFILE_INFO: str = "/opt/ota/client/aws_profile_info.yaml" + AWS_PROFILE_INFO: str = "/opt/ota/iot-logger/aws_profile_info.yaml" """The path to aws_profile_info.yaml.""" LISTEN_ADDRESS: str = "127.0.0.1" LISTEN_PORT: int = 8083 - SERVER_LOGGING_LEVEL: int = logging.INFO + UPLOAD_LOGGING_SERVER_LOGS: bool = False + SERVER_LOGSTREAM_SUFFIX: str = "iot_logging_server" + SERVER_LOGGING_LEVEL: _LoggingLevelName = "INFO" SERVER_LOGGING_LOG_FORMAT: str = ( "[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s" ) MAX_LOGS_BACKLOG: int = 4096 MAX_LOGS_PER_MERGE: int = 512 - UPLOAD_INTERVAL: int = 60 # in seconds + UPLOAD_INTERVAL: int = 3 # in seconds + + ECU_INFO_YAML: str = "/boot/ota/ecu_info.yaml" + + EXIT_ON_CONFIG_FILE_CHANGED: bool = True + """Kill the server when any config files changed.""" + +class _AWSProfile(BaseModel): + model_config = SettingsConfigDict(frozen=True) + profile_name: str + account_id: Annotated[str, BeforeValidator(str)] = Field(pattern=r"^\d{12}$") + credential_endpoint: str -class AWSProfileInfo(BaseModel): - class Profile(BaseModel): - model_config = SettingsConfigDict(frozen=True) - profile_name: str - account_id: str - credential_endpoint: AnyHttpUrl - profiles: list[Profile] +class AWSProfileInfo(RootModel[List[_AWSProfile]]): - def get_profile_info(self, profile_name: str) -> Profile: - for profile in self.profiles: + def get_profile_info(self, profile_name: str) -> _AWSProfile: + for profile in self.root: if profile.profile_name == profile_name: return profile raise KeyError(f"failed to get profile info for {profile_name=}") @@ -70,3 +81,4 @@ def load_profile_info(_cfg_fpath: str) -> AWSProfileInfo: server_cfg = ConfigurableLoggingServerConfig() profile_info = load_profile_info(server_cfg.AWS_PROFILE_INFO) +monitored_config_files.add(server_cfg.AWS_PROFILE_INFO) diff --git a/src/otaclient_iot_logging_server/ecu_info.py b/src/otaclient_iot_logging_server/ecu_info.py new file mode 100644 index 0000000..90ba3b6 --- /dev/null +++ b/src/otaclient_iot_logging_server/ecu_info.py @@ -0,0 +1,75 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ECU metadatas definition and parsing logic. + +Basically the one copied from otaclient, with only parsing fields we care about. +""" + + +from __future__ import annotations + +import logging +from functools import cached_property +from pathlib import Path +from typing import List, Optional + +import yaml +from pydantic import BaseModel, ConfigDict, Field, IPvAnyAddress + +from otaclient_iot_logging_server.config_file_monitor import monitored_config_files +from otaclient_iot_logging_server.configs import server_cfg + +logger = logging.getLogger(__name__) + + +class BaseFixedConfig(BaseModel): + model_config = ConfigDict(frozen=True) + + +class ECUContact(BaseFixedConfig): + ecu_id: str + ip_addr: IPvAnyAddress + port: int = 50051 + + +class ECUInfo(BaseFixedConfig): + """ECU info configuration. + + We only need to parse ecu_id and secondaries fields. + """ + + format_version: int = 1 + ecu_id: str + secondaries: List[ECUContact] = Field(default_factory=list) + + @cached_property + def ecu_id_set(self) -> set[str]: + res = [ecu_contact.ecu_id for ecu_contact in self.secondaries] + res.append(self.ecu_id) + return set(res) + + +def parse_ecu_info(ecu_info_file: Path | str) -> Optional[ECUInfo]: + try: + _raw_yaml_str = Path(ecu_info_file).read_text() + loaded_ecu_info = yaml.safe_load(_raw_yaml_str) + assert isinstance(loaded_ecu_info, dict), "not a valid yaml file" + return ECUInfo.model_validate(loaded_ecu_info, strict=True) + except Exception as e: + logger.info(f"{ecu_info_file=} is invalid or missing: {e!r}") + + +ecu_info = parse_ecu_info(server_cfg.ECU_INFO_YAML) +if ecu_info: + monitored_config_files.add(server_cfg.ECU_INFO_YAML) diff --git a/otaclient_iot_logging_server/greengrass_config.py b/src/otaclient_iot_logging_server/greengrass_config.py similarity index 73% rename from otaclient_iot_logging_server/greengrass_config.py rename to src/otaclient_iot_logging_server/greengrass_config.py index f062d43..11536c0 100644 --- a/otaclient_iot_logging_server/greengrass_config.py +++ b/src/otaclient_iot_logging_server/greengrass_config.py @@ -21,31 +21,30 @@ import re from functools import partial from pathlib import Path -from typing import NamedTuple +from typing import Any, NamedTuple, Optional from urllib.parse import urljoin import yaml from pydantic import computed_field -from otaclient_iot_logging_server._utils import ( - FixedConfig, - NestedDict, - chain_query, - remove_prefix, -) +from otaclient_iot_logging_server._utils import FixedConfig, chain_query, remove_prefix +from otaclient_iot_logging_server.config_file_monitor import monitored_config_files from otaclient_iot_logging_server.configs import profile_info, server_cfg logger = logging.getLogger(__name__) +THINGNAME_PA = re.compile(r"^(thing[/:])?(?P[\w-]+)-edge-(?P[\w-]+)-.*$") +THINGNAME_MAXLENGH = 128 + len("thing/") +"""ThingName's max length is 128. See https://docs.aws.amazon.com/iot/latest/apireference/API_ThingDocument.html.""" + + def get_profile_from_thing_name(_in: str) -> str: """Get profile from specific thing_name naming scheme. Schema: thing/-edge--Core """ - THINGNAME_PA = re.compile( - r"^(thing[/:])?(?P[\w-]+)-edge-(?P[\w-]+)-.*$" - ) + assert len(_in) <= THINGNAME_MAXLENGH, f"invalid thing_name: {_in}" _ma = THINGNAME_PA.match(_in) assert _ma, f"invalid resource id: {_in}" @@ -94,7 +93,7 @@ def parse_v1_config(_raw_cfg: str) -> IoTSessionConfig: NOTE(20240207): not consider TPM for ggv1. """ - loaded_cfg = json.loads(_raw_cfg) + loaded_cfg: dict[str, Any] = json.loads(_raw_cfg) assert isinstance(loaded_cfg, dict), f"invalid cfg: {_raw_cfg}" _raw_thing_arn = chain_query(loaded_cfg, "coreThing", "thingArn") @@ -124,57 +123,6 @@ def parse_v1_config(_raw_cfg: str) -> IoTSessionConfig: # # ------ v2 configuration parse ------ # # -def _v2_complete_uri(_cfg: NestedDict, _uri: str) -> str: - """Fix up the URI if the URI is pkcs11 URI. - - In gg v2 config, the pin-value(userPin) are specified in - aws.greengrass.crypto.Pkcs11Provider section, so these - option is striped from priv_key/cert URI. - - As we will feed the URI to external openssl pkcs11 engine when using - pycurl, we need to add the userPin information back to URI for openssl. - - Example pkcs11 URI schema: - pkcs11:token=;object=;pin-value=;type= - - Args: - _cfg: the dumped config file dict. - _uri: the input uri for completing. - - Returns: - Original input <_uri> if <_uri> is not a pkcs11 URI, else a completed - pkcs11 URI with pin-value inserted. - - Raises: - ValueError on failing complete a pkcs11 URI. - """ - if not _uri.startswith("pkcs11:"): - return _uri - - scheme, pkcs11_opts_str = _uri.split(":", maxsplit=1) - pkcs11_opts_dict = {} - for opt in pkcs11_opts_str.split(";"): - k, v = opt.split("=", maxsplit=1) - pkcs11_opts_dict[k] = v - - try: - user_pin = chain_query( - _cfg, - "services", - "aws.greengrass.crypto.Pkcs11Provider", - "configuration", - "userPin", - ) - pkcs11_opts_dict["pin-value"] = user_pin - except ValueError as e: - raise ValueError( - f"failed to complete pkcs11 URI: {e!r}\nconfig={_cfg}uri=\n{_uri} " - ) - - pkcs11_opts_str = (f"{k}={v}" for k, v in pkcs11_opts_dict.items()) - return f"{scheme}:{';'.join(pkcs11_opts_str)}" - - def parse_v2_config(_raw_cfg: str) -> IoTSessionConfig: """Parse Greengrass V2 config yaml and take what we need. @@ -183,7 +131,7 @@ def parse_v2_config(_raw_cfg: str) -> IoTSessionConfig: https://tier4.atlassian.net/wiki/spaces/HIICS/pages/2544042770/TPM+Ubuntu+22.04+Greengrass+v2. https://datatracker.ietf.org/doc/html/rfc7512. """ - loaded_cfg = yaml.safe_load(_raw_cfg) + loaded_cfg: dict[str, Any] = yaml.safe_load(_raw_cfg) assert isinstance(loaded_cfg, dict), f"invalid cfg: {_raw_cfg}" thing_name = chain_query(loaded_cfg, "system", "thingName") @@ -193,29 +141,40 @@ def parse_v2_config(_raw_cfg: str) -> IoTSessionConfig: # NOTE(20240207): use credential endpoint defined in the config.yml in prior, # only when this information is not available, we use the # <_AWS_CREDENTIAL_PROVIDER_ENDPOINT_MAPPING> to get endpoint. - _cred_endpoint: str = chain_query( + _cred_endpoint: str + if _cred_endpoint := chain_query( loaded_cfg, "services", "aws.greengrass.Nucleus", "configuration", "iotCredEndpoint", default=None, - ) - if _cred_endpoint is None: - cred_endpoint = str(this_profile_info.credential_endpoint) + ): + cred_endpoint = _cred_endpoint else: - cred_endpoint = f"https://{_cred_endpoint.rstrip('/')}/" + cred_endpoint = this_profile_info.credential_endpoint + # ------ parse pkcs11 config if any ------ # + _raw_pkcs11_cfg: dict[str, str] + pkcs11_cfg = None + if _raw_pkcs11_cfg := chain_query( + loaded_cfg, + "services", + "aws.greengrass.crypto.Pkcs11Provider", + "configuration", + default=None, + ): + pkcs11_cfg = PKCS11Config( + pkcs11_lib=_raw_pkcs11_cfg["library"], + user_pin=_raw_pkcs11_cfg["userPin"], + slot_id=str(_raw_pkcs11_cfg["slot"]), + ) return IoTSessionConfig( # NOTE: v2 config doesn't include account_id info account_id=this_profile_info.account_id, ca_path=chain_query(loaded_cfg, "system", "rootCaPath"), - private_key_path=_v2_complete_uri( - loaded_cfg, chain_query(loaded_cfg, "system", "privateKeyPath") - ), - certificate_path=_v2_complete_uri( - loaded_cfg, chain_query(loaded_cfg, "system", "certificateFilePath") - ), + private_key_path=chain_query(loaded_cfg, "system", "privateKeyPath"), + certificate_path=chain_query(loaded_cfg, "system", "certificateFilePath"), thing_name=thing_name, profile=this_profile_info.profile_name, region=chain_query( @@ -226,12 +185,23 @@ def parse_v2_config(_raw_cfg: str) -> IoTSessionConfig: "awsRegion", ), aws_credential_provider_endpoint=cred_endpoint, + pkcs11_config=pkcs11_cfg, ) # # ------ main config parser ------ # # +class PKCS11Config(FixedConfig): + """ + See services.aws.greengrass.crypto.Pkcs11Provider section for more details. + """ + + pkcs11_lib: str + slot_id: str + user_pin: str + + class IoTSessionConfig(FixedConfig): """Configurations we need picked from parsed Greengrass V1/V2 configration file. @@ -249,6 +219,7 @@ class IoTSessionConfig(FixedConfig): region: str aws_credential_provider_endpoint: str + pkcs11_config: Optional[PKCS11Config] = None @computed_field @property @@ -271,7 +242,7 @@ def aws_cloudwatch_log_group(self) -> str: def aws_credential_refresh_url(self) -> str: """The endpoint to refresh token from.""" return urljoin( - self.aws_credential_provider_endpoint, + f"https://{self.aws_credential_provider_endpoint.rstrip('/')}/", f"role-aliases/{self.aws_role_alias}/credentials", ) @@ -285,10 +256,12 @@ def parse_config() -> IoTSessionConfig: if (_v2_cfg_f := Path(server_cfg.GREENGRASS_V2_CONFIG)).is_file(): _v2_cfg = parse_v2_config(_v2_cfg_f.read_text()) logger.debug(f"gg config v2 is in used: {_v2_cfg}") + monitored_config_files.add(server_cfg.GREENGRASS_V2_CONFIG) return _v2_cfg _v1_cfg = parse_v1_config(Path(server_cfg.GREENGRASS_V1_CONFIG).read_text()) logger.debug(f"gg config v1 is in used: {_v1_cfg}") + monitored_config_files.add(server_cfg.GREENGRASS_V1_CONFIG) return _v1_cfg except Exception as e: _msg = f"failed to parse config: {e!r}" diff --git a/otaclient_iot_logging_server/log_proxy_server.py b/src/otaclient_iot_logging_server/log_proxy_server.py similarity index 59% rename from otaclient_iot_logging_server/log_proxy_server.py rename to src/otaclient_iot_logging_server/log_proxy_server.py index e57bf19..586301e 100644 --- a/otaclient_iot_logging_server/log_proxy_server.py +++ b/src/otaclient_iot_logging_server/log_proxy_server.py @@ -23,38 +23,45 @@ from aiohttp import web from aiohttp.web import Request -from otaclient_iot_logging_server._common import LogMessage -from otaclient_iot_logging_server.aws_iot_logger import ( - AWSIoTLogger, - start_sending_msg_thread, -) +from otaclient_iot_logging_server._common import LogMessage, LogsQueue from otaclient_iot_logging_server.configs import server_cfg -from otaclient_iot_logging_server.greengrass_config import IoTSessionConfig +from otaclient_iot_logging_server.ecu_info import ecu_info logger = logging.getLogger(__name__) class LoggingPostHandler: - """A simple aiohttp server handler that receives logs from otaclient. + """A simple aiohttp server handler that receives logs from otaclient.""" - This server listen POST requests on /, and then package the - incoming posted into LogMessage instance as follow: - log_msg = LogMessage(timestamp=, message=) - and then push the instance into queue for aws_iot_logger - to process and upload to AWS cloudwatch. - """ - - def __init__(self, queue: Queue[tuple[str, LogMessage]]) -> None: + def __init__(self, queue: LogsQueue) -> None: self._queue = queue + self._allowed_ecus = None + + if ecu_info: + self._allowed_ecus = ecu_info.ecu_id_set + logger.info( + f"setup allowed_ecu_id from ecu_info.yaml: {ecu_info.ecu_id_set}" + ) + else: + logger.warning( + "no ecu_info.yaml presented, logging upload filtering is DISABLED" + ) # route: POST /{ecu_id} - async def _logging_post_handler(self, request: Request): + async def logging_post_handler(self, request: Request): """ NOTE: use as log_stream_suffix, each ECU has its own logging stream for uploading. """ _ecu_id = request.match_info["ecu_id"] _raw_logging = await request.text() + _allowed_ecus = self._allowed_ecus + + # don't allow empty request or unknowned ECUs + # if ECU id is unknown(not listed in ecu_info.yaml), drop this log. + if not _raw_logging or (_allowed_ecus and _ecu_id not in _allowed_ecus): + return web.Response(status=HTTPStatus.BAD_REQUEST) + _logging_msg = LogMessage( timestamp=int(time.time()) * 1000, # milliseconds message=_raw_logging, @@ -70,25 +77,10 @@ async def _logging_post_handler(self, request: Request): return web.Response(status=HTTPStatus.OK) -def launch_server( - session_config: IoTSessionConfig, - max_logs_backlog: int, - max_logs_per_merge: int, - interval: int, -): - queue = Queue(maxsize=max_logs_backlog) - start_sending_msg_thread( - AWSIoTLogger( - session_config=session_config, - queue=queue, - max_logs_per_merge=max_logs_per_merge, - interval=interval, - ) - ) - +def launch_server(queue: Queue[tuple[str, LogMessage]]) -> None: handler = LoggingPostHandler(queue=queue) app = web.Application() - app.add_routes([web.post(r"/{ecu_id}", handler._logging_post_handler)]) + app.add_routes([web.post(r"/{ecu_id}", handler.logging_post_handler)]) - # actual launch the server and serving - web.run_app(app, host=server_cfg.LISTEN_ADDRESS, port=server_cfg.LISTEN_PORT) + # typing: run_app is a NoReturn method, unless received signal + web.run_app(app, host=server_cfg.LISTEN_ADDRESS, port=server_cfg.LISTEN_PORT) # type: ignore diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..bcfd866 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1de5753 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,20 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pathlib import Path + +TEST_PACKAGE = Path(__file__).parent + +TEST_DATA_DPATH = TEST_PACKAGE / "data" diff --git a/tests/data/aws_profile_info.yaml b/tests/data/aws_profile_info.yaml index 60014c5..16a374e 100644 --- a/tests/data/aws_profile_info.yaml +++ b/tests/data/aws_profile_info.yaml @@ -1,9 +1,9 @@ - profile_name: "profile-dev" account_id: "012345678901" - credential_endpoint: "https://abcdefghijk01.credentials.iot.region.amazonaws.com/" + credential_endpoint: "abcdefghijk01.credentials.iot.region.amazonaws.com" - profile_name: "profile-stg" account_id: "012345678902" - credential_endpoint: "https://abcdefghijk02.credentials.iot.region.amazonaws.com/" + credential_endpoint: "abcdefghijk02.credentials.iot.region.amazonaws.com" - profile_name: "profile-prd" account_id: "012345678903" - credential_endpoint: "https://abcdefghijk03.credentials.iot.region.amazonaws.com/" \ No newline at end of file + credential_endpoint: "abcdefghijk03.credentials.iot.region.amazonaws.com" diff --git a/tests/data/ecu_info.yaml b/tests/data/ecu_info.yaml new file mode 100644 index 0000000..344595c --- /dev/null +++ b/tests/data/ecu_info.yaml @@ -0,0 +1,15 @@ +format_version: 1 +ecu_id: "main" +bootloader: "grub" +secondaries: + - ecu_id: "sub1" + ip_addr: "192.168.10.21" + - ecu_id: "sub2" + ip_addr: "192.168.10.22" + - ecu_id: "sub3" + ip_addr: "192.168.10.23" +available_ecu_ids: + - "main" + - "sub1" + - "sub2" + - "sub3" diff --git a/tests/data/gg_v1_cfg.json b/tests/data/gg_v1_cfg.json index 0dd5b5f..3299fd3 100644 --- a/tests/data/gg_v1_cfg.json +++ b/tests/data/gg_v1_cfg.json @@ -3,7 +3,7 @@ "caPath": "root.ca.pem", "certPath": "gg.cert.pem", "keyPath": "gg.private.key", - "thingArn": "arn:aws:iot:region:012345678901:thing/thing_name", + "thingArn": "arn:aws:iot:region:012345678901:thing/profile-dev-edge-ggv1-Core", "iotHost": "abcdefghijklm-ats.iot.region.amazonaws.com", "ggHost": "greengrass-ats.iot.region.amazonaws.com", "keepAlive": 30 @@ -26,4 +26,4 @@ } } } -} \ No newline at end of file +} diff --git a/tests/data/gg_v2_cfg.yaml b/tests/data/gg_v2_cfg.yaml index 4c87074..f20f542 100644 --- a/tests/data/gg_v2_cfg.yaml +++ b/tests/data/gg_v2_cfg.yaml @@ -3,7 +3,7 @@ system: privateKeyPath: "/greengrass/certs/gg.private.key" rootCaPath: "/greengrass/certs/root.ca.pem" rootpath: "/greengrass/v2" - thingName: "thing_name" + thingName: "profile-dev-edge-ggv2-Core" services: aws.greengrass.Nucleus: componentType: "NUCLEUS" @@ -12,4 +12,4 @@ services: awsRegion: "region" iotRoleAlias: "iot_role_alias" iotDataEndpoint: "abcdefghijklm-ats.iot.region.amazonaws.com" - iotCredEndpoint: "abcdefghijk01.credentials.iot.region.amazonaws.com" \ No newline at end of file + iotCredEndpoint: "abcdefghijk01.credentials.iot.region.amazonaws.com" diff --git a/tests/data/gg_v2_cfg.yaml_tpm2.0 b/tests/data/gg_v2_cfg.yaml_tpm2.0 index c48c76a..815d091 100644 --- a/tests/data/gg_v2_cfg.yaml_tpm2.0 +++ b/tests/data/gg_v2_cfg.yaml_tpm2.0 @@ -1,9 +1,9 @@ system: - certificateFilePath: "pkcs11:object=greengrass;type=cert;pin-value=greengrass" - privateKeyPath: "pkcs11:object=greengrass;type=private;pin-value=greengrass" + certificateFilePath: "pkcs11:object=greengrass_key;type=cert;pin-value=greengrass_userpin" + privateKeyPath: "pkcs11:object=greengrass_key;type=private;pin-value=greengrass_userpin" rootCaPath: "/greengrass/certs/root.ca.pem" rootpath: "/greengrass/v2" - thingName: "thing_name" + thingName: "profile-dev-edge-ggv2-Core" services: aws.greengrass.Nucleus: componentType: "NUCLEUS" @@ -18,4 +18,4 @@ services: library: "/usr/lib/x86_64-linux-gnu/pkcs11/libtpm2_pkcs11.so" name: "tpm2_pkcs11" slot: 1 - userPin: "greengrass" \ No newline at end of file + userPin: "greengrass_userpin" diff --git a/tests/data/sample_cert.der b/tests/data/sample_cert.der new file mode 100644 index 0000000..e4e192d Binary files /dev/null and b/tests/data/sample_cert.der differ diff --git a/tests/data/sample_cert.pem b/tests/data/sample_cert.pem new file mode 100644 index 0000000..9e97c7f --- /dev/null +++ b/tests/data/sample_cert.pem @@ -0,0 +1,30 @@ +-----BEGIN CERTIFICATE----- +MIIFJTCCAw2gAwIBAgIUP2ypfC7tc8gGmbUDDnYkXokajwQwDQYJKoZIhvcNAQEL +BQAwIjENMAsGA1UECwwEdGVzdDERMA8GA1UEAwwIdGVzdF9jcnQwHhcNMjQwMjI2 +MDgyMTUzWhcNMzQwMjIzMDgyMTUzWjAiMQ0wCwYDVQQLDAR0ZXN0MREwDwYDVQQD +DAh0ZXN0X2NydDCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAJ451LDo +VGmRIl/hL4hMPlC0vtE2eXhRaLip3+zRFaGTABVJPSM/g7hls+VWWevi7NDVjaIi +7PoGwn4h0AdyIjFBuT26ihn9n65rUFH1wWH0mQP1e3/fCFIhx51s5FF3dkzIhHgY +I6W02D1rqhyKViYr1TLYoqwc5dXribQBQfHvN7D7wMiCG/y0HWFJ4WHL98mIlq36 +nuu+G4U11SIZm6AIhGKOBSiHWbmlMbiwNFj6Cq5wCVUNXGt0mTf0YL3jXXJ1aU8p +rOL4ObI/QsxWr75/bfz6BZSIPuqr+xFjer6LslOjRrrkAFs6oJ/CrKaU+PLEPWCB +Ug2Nr4qdI501LZVXyZUMUIEd+klx3zG7j/0GAm5BZAyfJ9MUVwD9/KpHvuYLPyG0 +vm5LcCGmtsS7+TU8x2eGv92jX5f4K3gt99tBczEyv9xM657vPRWanf0M/gtQsRL2 +cjEO+CpeIaI8XsK9Gm8qTGXCPQFl0N8X5IU+VF/fyblG10+Wef8VeGOX+DaieF6+ +RQicAwVIFDv0JTp6uE74FdmPdv0NkX7ACxrvSzD0DaiA3WQWWxt+IlfDFcW3J9NG +aymTBuzy25Egf+eE6mLz3EmsRxI0DhNhsRLRQkqkBluYXPUR7941VLEyIFWIQaSK +RsnrnPQFcxJpJhVWAU39YM+MGGbdTw0EOUQHAgMBAAGjUzBRMB0GA1UdDgQWBBQ9 +WiAjjsG4XRkPZiAhaEpgVQ70hDAfBgNVHSMEGDAWgBQ9WiAjjsG4XRkPZiAhaEpg +VQ70hDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4ICAQBi+lF/sU8+ +iqhQaeP7uI8ozCYDX83EfUa6tYKY7qXt5MkLSBLSJz7Zzmcic+erJ8RjilpMo4MW +EYak8PWIsSfpCUyIMY9DibfNor+0dN86EJuTBGuh1/ypQOh0yffkLZt7pqsPFUaS +c4xaunaMdFBd11f8g8ZVqOeyQnf8waleIELr7nsCjsEQB1xB0eEh8+KxGgur7kpO +tOQVDi4RnCn1/Ed98eHuNMlBj+HAX3YOB66G4DGXJFkkJLk1iBTq7hqC3iXnDqJM +QoY+kFjtfmdUSDDTNRJ2tIhxYYJEFyOOn3W0ci2VIaJ+zfDblUbf12RiP+JbRneS +07Dc+vNi6w/Z7G3tenf9pK6VtadO+bSv2P/0GEWFECeAwudAVNamL7vWNe5A5Rho +GGFJzTuVp8xzZT0bfofYTbHEH320ExLdRkl9b/NVRF3KKrSnZ+Zb0Di+pP/wYIUg +HWkeybTQPg+vqw5Duf7GOo83Q7qvSrkdtstEQv8eB0L6tWjrSpWFDcpKZR/iDiwf ++0/7rJaE7stwWdWdIWIXNBVw14MxTLNgX6hNSMucYdf0Cu7EE/m76UDwyw5+ocP4 +aog34s7uK+TeEHYPnTM6d+jHD9VbHBH92XsxC7k8qzZjzn9gpHe4Uvij0NR2kg/x +7L+WXcC72rV78+bc7vn88Ru6QFjxcjOJvg== +-----END CERTIFICATE----- diff --git a/tests/test__log_setting.py b/tests/test__log_setting.py new file mode 100644 index 0000000..67a4550 --- /dev/null +++ b/tests/test__log_setting.py @@ -0,0 +1,43 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging +from queue import Queue + +import otaclient_iot_logging_server._log_setting +from otaclient_iot_logging_server._common import LogsQueue +from otaclient_iot_logging_server._log_setting import _LogTeeHandler # type: ignore + +MODULE = otaclient_iot_logging_server._log_setting.__name__ + +logger = logging.getLogger(__name__) + + +def test_server_logger(): + _queue: LogsQueue = Queue() + suffix = "test_suffix" + # ------ setup test ------ # + _handler = _LogTeeHandler(_queue, suffix) # type: ignore + logger.addHandler(_handler) + # ------ execution ------ # + logger.info("emit one logging entry") + # ------ clenaup ------ # + logger.removeHandler(_handler) + # ------ check result ------ # + _log = _queue.get_nowait() + assert _log[0] == suffix + assert _log[1] diff --git a/tests/test__main__.py b/tests/test__main__.py new file mode 100644 index 0000000..8ad8004 --- /dev/null +++ b/tests/test__main__.py @@ -0,0 +1,90 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging +from dataclasses import dataclass + +import pytest +from pytest import LogCaptureFixture +from pytest_mock import MockerFixture + +import otaclient_iot_logging_server.__main__ as _main_module + +MODULE = _main_module.__name__ + +logger = logging.getLogger(__name__) + + +@dataclass +class _ServerCfg: + """A minimum set of configs used by main module.""" + + SERVER_LOGGING_LOG_FORMAT: str = "test_format" + SERVER_LOGGING_LEVEL: str = "DEBUG" + UPLOAD_LOGGING_SERVER_LOGS: bool = False + SERVER_LOGSTREAM_SUFFIX: str = "test_suffix" + LISTEN_ADDRESS: str = "172.16.1.1" + LISTEN_PORT: int = 1234 + MAX_LOGS_PER_MERGE: int = 123 + MAX_LOGS_BACKLOG: int = 1234 + UPLOAD_INTERVAL: int = 12 + EXIT_ON_CONFIG_FILE_CHANGED: bool = False + + +@pytest.mark.parametrize("_in_server_cfg, _version", [(_ServerCfg(), "test_version")]) +def test_main( + _in_server_cfg: _ServerCfg, + _version: str, + mocker: MockerFixture, + caplog: LogCaptureFixture, +): + # ------ prepare patching ------ # + mocker.patch( + f"{MODULE}.config_logging", + _logger_mock := mocker.MagicMock(return_value=logger), + ) + mocker.patch( + f"{MODULE}.start_aws_iot_logger_thread", + _aws_iot_logger_mock := mocker.MagicMock(), + ) + mocker.patch( + f"{MODULE}.launch_server", + _launch_server_mock := mocker.MagicMock(), + ) + mocker.patch(f"{MODULE}.__version__", _version) + mocker.patch(f"{MODULE}.server_cfg", _in_server_cfg) + + # ------ execution ------ # + _main_module.main() + + # ------ check result ------ # + _logger_mock.assert_called_once_with( + mocker.ANY, + format=_in_server_cfg.SERVER_LOGGING_LOG_FORMAT, + level=_in_server_cfg.SERVER_LOGGING_LEVEL, + enable_server_log=_in_server_cfg.UPLOAD_LOGGING_SERVER_LOGS, + server_logstream_suffix=_in_server_cfg.SERVER_LOGSTREAM_SUFFIX, + ) + _aws_iot_logger_mock.assert_called_once() + _launch_server_mock.assert_called_once() + + # check __main__.main source code for more details + assert ( + caplog.records[-2].msg + == f"launching iot_logging_server({_version}) at http://{_in_server_cfg.LISTEN_ADDRESS}:{_in_server_cfg.LISTEN_PORT}" + ) + assert (caplog.records[-1].msg) == f"iot_logging_server config: \n{_in_server_cfg}" diff --git a/tests/test__utils.py b/tests/test__utils.py new file mode 100644 index 0000000..c0d0ae1 --- /dev/null +++ b/tests/test__utils.py @@ -0,0 +1,260 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging +import random +import time +from typing import Any + +import pytest + +from otaclient_iot_logging_server._utils import ( + NestedDict, + chain_query, + parse_pkcs11_uri, + remove_prefix, + retry, +) + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize( + "_input, _paths, _expected, _default", + [ + # test#1: succeeded chain_query + ( + {"a": {"b": {"c": {"d": "e"}}}}, + ["a", "b", "c", "d"], + "e", + None, + ), + # test#2: failed chain query with set to "default_value" + ( + {"a": {"b": {"c": {"d": "e"}}}}, + ["non", "existed", "path"], + "default_value", + "default_value", + ), + ], +) +def test_chain_query( + _input: NestedDict, + _paths: str, + _expected: Any, + _default: Any, +): + _queried = chain_query(_input, *_paths, default=_default) + assert _queried == _expected + + +class TestRetry: + + class HandledException(Exception): + pass + + class UnhandledException(Exception): + pass + + @staticmethod + def _func_factory( + _max_retry: int, + _return_value: Any, + exception_to_raise: list[Any] | None = None, + ): + # return a func that succeeds in first run + if exception_to_raise is None: + return lambda: _return_value + + execution_round = 0 + exception_replay = iter(exception_to_raise) + + def _func(): + nonlocal execution_round, exception_replay + execution_round += 1 + _exception = next(exception_replay, None) + + if _exception is None: + if execution_round <= _max_retry + 1: + return _return_value + logger.error("retrier doesn't work!") + raise ValueError + logger.info(f"{execution_round=}") + raise _exception + + return _func + + def test_normal_finished(self): + """ + Function returns directly without raising any exception. + """ + return_value = random.randint(10**3, 10**6) + _res = retry( + self._func_factory(0, return_value), + retry_on_exceptions=(self.HandledException,), + )() + assert _res == return_value + + def test_successfully_retried(self): + """ + Function failed for some times, within , but finally succeeded. + """ + return_value = random.randint(10**3, 10**6) + max_retries, actual_retries = 8, 7 + + _res = retry( + self._func_factory( + actual_retries, + return_value, + exception_to_raise=[ + self.HandledException for _ in range(actual_retries) + ], + ), + max_retry=max_retries, + backoff_factor=0.01, # for speeding up test + retry_on_exceptions=(self.HandledException,), + )() + assert _res == return_value + + def test_aborted_by_unhandled_exception(self): + return_value = random.randint(10**3, 10**6) + max_retries, actual_retries = 8, 7 + + with pytest.raises(self.UnhandledException): + retry( + self._func_factory( + actual_retries, + return_value, + exception_to_raise=[ + self.HandledException for _ in range(actual_retries - 1) + ] + + [self.UnhandledException], + ), + max_retry=max_retries, + backoff_factor=0.01, # for speeding up test + retry_on_exceptions=(self.HandledException,), + )() + + def test_aborted_by_exceeded_max_retries(self): + return_value = random.randint(10**3, 10**6) + max_retries, actual_retries = 3, 7 + + with pytest.raises(self.HandledException): + _exceptions = [self.HandledException for _ in range(actual_retries)] + retry( + self._func_factory( + actual_retries, + return_value, + exception_to_raise=_exceptions, + ), + max_retry=max_retries, + backoff_factor=0.01, # for speeding up test + retry_on_exceptions=(self.HandledException,), + )() + + def test_retry_session_timecost(self): + """ + For a retry session with the following configurations: + 1. backoff_factor = 0.1 + 2. backoff_max = 1 + 3. max_retry = 8 + We should have the time cost sequence as follow: + 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.0, 1.0 + So the retry session should not take more than 6s(5.1s+) + """ + max_retries, actual_retries = 8, 9 + backoff_factor, backoff_max = 0.1, 1 + # NOTE: add some overhead for function execution + expected_retry_session_timecost = ( + sum(min(backoff_max, backoff_factor * 2**i) for i in range(max_retries)) + + 0.5 + ) + + return_value = random.randint(10**3, 10**6) + with pytest.raises(self.HandledException): + _start_time = time.time() + retry( + self._func_factory( + actual_retries, + return_value, + exception_to_raise=[ + self.HandledException for _ in range(actual_retries) + ], + ), + max_retry=max_retries, + backoff_factor=backoff_factor, + backoff_max=backoff_max, + retry_on_exceptions=(self.HandledException,), + )() + + time_cost = time.time() - _start_time + logger.info(f"{time_cost=}") + assert time_cost <= expected_retry_session_timecost + + +@pytest.mark.parametrize( + "_input, _prefix, _expected", + [ + # test#1: test remove schema from pkcs11 URI + ( + "pkcs11:token=token;object=object;pin-value=pin-value", + "pkcs11:", + "token=token;object=object;pin-value=pin-value", + ), + # test#2: test remove schema from file URI + ( + "file:///path/to/something", + "file://", + "/path/to/something", + ), + ( + "abcabcabcabcabcabcabcabcabc", + "abc", + "abcabcabcabcabcabcabcabc", + ), + ], +) +def test_remove_prefix(_input: str, _prefix: str, _expected: str): + assert remove_prefix(_input, _prefix) == _expected + + +@pytest.mark.parametrize( + "_pkcs11_uri, _expected", + [ + # test#1: TypedDict also accepts unknown keys + ( + "pkcs11:token=token;object=object;slot-id=1;pin-value=pin-value;type=cert", + { + "object": "object", + "token": "token", + "pin-value": "pin-value", + "type": "cert", + "slot-id": "1", + }, + ), + # test#2: minimum pkcs11 sample + ( + "pkcs11:object=object;type=cert", + { + "object": "object", + "type": "cert", + }, + ), + ], +) +def test_parse_pkcs11_uri(_pkcs11_uri: str, _expected: dict[str, Any]): + assert parse_pkcs11_uri(_pkcs11_uri) == _expected diff --git a/tests/test_aws_iot_logger.py b/tests/test_aws_iot_logger.py new file mode 100644 index 0000000..378efc5 --- /dev/null +++ b/tests/test_aws_iot_logger.py @@ -0,0 +1,147 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging +import os +import random +import time +from collections import defaultdict +from datetime import datetime +from queue import Queue +from uuid import uuid1 + +import pytest +from pytest_mock import MockerFixture + +import otaclient_iot_logging_server.aws_iot_logger +from otaclient_iot_logging_server._common import LogMessage, LogsQueue +from otaclient_iot_logging_server.aws_iot_logger import ( + AWSIoTLogger, + get_log_stream_name, +) + +logger = logging.getLogger(__name__) + +MODULE = otaclient_iot_logging_server.aws_iot_logger.__name__ + +_UNIX_EPOCH = datetime(1970, 1, 1, 0, 0) +_UNIX_EPOCH_FMT = "1970/01/01" + + +@pytest.mark.parametrize( + "_thing_name, _suffix, _expected", + [ + ( + "some_thingname", + "some_suffix", + f"{_UNIX_EPOCH_FMT}/some_thingname/some_suffix", + ), + ( + _thing_name := f"profile-dev-edge-{uuid1()}-Core", + _suffix := "some_ecu", + f"{_UNIX_EPOCH_FMT}/{_thing_name}/{_suffix}", + ), + ], +) +def test_get_log_stream_name( + _thing_name: str, _suffix: str, _expected: str, mocker: MockerFixture +): + _datetime_mock = mocker.MagicMock(spec=datetime) + _datetime_mock.now.return_value = _UNIX_EPOCH + mocker.patch(f"{MODULE}.datetime", _datetime_mock) + assert get_log_stream_name(_thing_name, _suffix) == _expected + + +_mocked_ECUs_list = ("main_ecu", "sub_ecu0", "sub_ecu1", "sub_ecu2", "sub_ecu3") + + +def generate_random_msgs( + msg_len: int, + msg_num: int, + ecus_list: tuple[str, ...] = _mocked_ECUs_list, +) -> list[tuple[str, LogMessage]]: + _res: list[tuple[str, LogMessage]] = [] + for _ in range(msg_num): + _ecu, *_ = random.sample(ecus_list, 1) + _msg = os.urandom(msg_len).hex() + _timestamp = int(time.time()) * 1000 # milliseconds + _res.append((_ecu, LogMessage(timestamp=_timestamp, message=_msg))) + return _res + + +class TestAWSIoTLogger: + MSG_LEN = 16 + MSG_NUM = 4096 + + class _TestFinished(Exception): + pass + + def _mocked_put_log_events(self, _ecu_id: str, _logs: list[LogMessage]): + self._test_result[_ecu_id] = _logs + + @pytest.fixture + def prepare_test_data(self): + _msgs = generate_random_msgs(self.MSG_LEN, self.MSG_NUM) + # prepare result for test_thread_main + _merged_msgs: dict[str, list[LogMessage]] = defaultdict(list) + for _ecu_id, _log_msg in _msgs: + _merged_msgs[_ecu_id].append(_log_msg) + self._merged_msgs = _merged_msgs + # prepare the queue for test + _queue: LogsQueue = Queue() + for _item in _msgs: + _queue.put_nowait(_item) + self._queue = _queue + + @pytest.fixture(autouse=True) + def setup_test(self, prepare_test_data, mocker: MockerFixture): + _time_mocker = mocker.MagicMock(spec=time) + # NOTE: a hack here to interrupt the while loop + _time_mocker.sleep.side_effect = self._TestFinished + mocker.patch(f"{MODULE}.time", _time_mocker) + # ------ prepare test self ------ # + # The following bound variables will be used in thread_main method. + # NOTE: another hack to let all entries being merged within one + # loop iteration. + self._max_logs_per_merge = float("inf") + self.put_log_events = self._mocked_put_log_events + self._interval = 6 # place holder + self._session_config = mocker.MagicMock() # place holder + # for holding test results + # mocked_send_messages will record each calls in this dict + self._test_result: dict[str, list[LogMessage]] = {} + # mock get_log_stream_name to let it returns the log_stream_suffix + # as it, make the test easier. + # see get_log_stream_name signature for more details + get_log_stream_name_mock = mocker.MagicMock(wraps=lambda x, y: y) + mocker.patch(f"{MODULE}.get_log_stream_name", get_log_stream_name_mock) + + def test_thread_main(self, mocker: MockerFixture): + func_to_test = AWSIoTLogger.thread_main + self._create_log_group = mocked__create_log_group = mocker.MagicMock( + spec=AWSIoTLogger._create_log_group + ) + + # ------ execution ------ # + with pytest.raises(self._TestFinished): + func_to_test.__get__(self)() + logger.info("execution finished") + + # ------ check result ------ # + mocked__create_log_group.assert_called_once() + # confirm the send_messages mock receives the expecting calls. + assert self._merged_msgs == self._test_result diff --git a/tests/test_boto3_session.py b/tests/test_boto3_session.py new file mode 100644 index 0000000..a49ab58 --- /dev/null +++ b/tests/test_boto3_session.py @@ -0,0 +1,134 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from typing import Any + +import pytest +from awsiot_credentialhelper.boto3_session import Boto3SessionProvider +from awsiot_credentialhelper.boto3_session import Pkcs11Config as aws_PKcs11Config +from pytest_mock import MockerFixture + +import otaclient_iot_logging_server.boto3_session +from otaclient_iot_logging_server._utils import parse_pkcs11_uri +from otaclient_iot_logging_server.boto3_session import ( # type: ignore + _convert_to_pem, + get_session, +) +from otaclient_iot_logging_server.greengrass_config import ( + IoTSessionConfig, + PKCS11Config, +) +from tests.conftest import TEST_DATA_DPATH + +MODULE = otaclient_iot_logging_server.boto3_session.__name__ + +SAMPLE_CERT_PEM_FPATH = TEST_DATA_DPATH / "sample_cert.pem" +SAMPLE_CERT_DER_FPATH = TEST_DATA_DPATH / "sample_cert.der" + + +@pytest.mark.parametrize( + "_in, _expected", + [ + ( + pem_cert := SAMPLE_CERT_PEM_FPATH.read_bytes(), + pem_cert, + ), + (SAMPLE_CERT_DER_FPATH.read_bytes(), pem_cert), + ], +) +def test__convert_to_pem(_in: bytes, _expected: bytes): + assert _convert_to_pem(_in) == _expected + + +_MOCKED_CERT = b"mocked_certs" +_PKCS11_PRIVKEY_URI = "pkcs11:object=greengrass_privkey;type=private" +_PARSED_PKCS11_PRIVKEY_URI = parse_pkcs11_uri(_PKCS11_PRIVKEY_URI) + + +@pytest.mark.parametrize( + "_config, _expected_call", + [ + # test#1: boto3 session without pkcs11 + ( + test1_cfg := IoTSessionConfig( + account_id="test_account", + ca_path="test_capath", + private_key_path="test_privkey_path", + certificate_path="test_cert_path", + thing_name="test_thing_name", + profile="test_profile", + region="test_region", + aws_credential_provider_endpoint="test_cred_endpoint", + ), + { + "endpoint": test1_cfg.aws_credential_provider_endpoint, + "role_alias": test1_cfg.aws_role_alias, + "certificate": _MOCKED_CERT, + "private_key": test1_cfg.private_key_path, + "thing_name": test1_cfg.thing_name, + }, + ), + # test#2: boto3 session with pkcs11 + ( + test2_cfg := IoTSessionConfig( + account_id="test_account", + ca_path="test_capath", + private_key_path=_PKCS11_PRIVKEY_URI, + certificate_path="test_cert_path", + thing_name="test_thing_name", + profile="test_profile", + region="test_region", + aws_credential_provider_endpoint="test_cred_endpoint", + pkcs11_config=( + test2_pkcs11_cfg := PKCS11Config( + pkcs11_lib="tpm2-pkcs11_lib", + slot_id="1", + user_pin="userpin", + ) + ), + ), + { + "endpoint": test2_cfg.aws_credential_provider_endpoint, + "role_alias": test2_cfg.aws_role_alias, + "certificate": _MOCKED_CERT, + "thing_name": test2_cfg.thing_name, + "pkcs11": aws_PKcs11Config( + pkcs11_lib=test2_pkcs11_cfg.pkcs11_lib, + slot_id=int(test2_pkcs11_cfg.slot_id), + user_pin=test2_pkcs11_cfg.user_pin, + private_key_label=_PARSED_PKCS11_PRIVKEY_URI["object"], + ), + }, + ), + ], +) +def test_get_session( + _config: IoTSessionConfig, _expected_call: dict[str, Any], mocker: MockerFixture +): + """ + Confirm with specific input IoTSessionConfig, we get the expected Boto3Session being created. + """ + # ------ setup test ------ # + _boto3_session_provider_mock = mocker.MagicMock(spec=Boto3SessionProvider) + mocker.patch(f"{MODULE}.Boto3SessionProvider", _boto3_session_provider_mock) + mocker.patch( + f"{MODULE}._load_certificate", mocker.MagicMock(return_value=_MOCKED_CERT) + ) + # ------ execution ------ # + get_session(_config) + # ------ check result ------ # + _boto3_session_provider_mock.assert_called_once_with(**_expected_call) diff --git a/tests/test_config_file_monitor.py b/tests/test_config_file_monitor.py new file mode 100644 index 0000000..dc6dcc3 --- /dev/null +++ b/tests/test_config_file_monitor.py @@ -0,0 +1,53 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from pathlib import Path + +import pytest +from pytest_mock import MockerFixture + +from otaclient_iot_logging_server import config_file_monitor + + +class _SuccessExit(Exception): + """config file monitor successfully kills the server.""" + + +class TestConfigFileMonitor: + + @pytest.fixture(autouse=True) + def setup_set(self, tmp_path: Path, mocker: MockerFixture): + self.config_file = config_file = tmp_path / "config_file" + config_file.write_text("config_file") + config_file_monitor.monitored_config_files.add(str(config_file)) + + # hack time.sleep to modify the config_file + def _modify_config_file(*args, **kwargs): + config_file.write_text("another config_file") + + mocker.patch.object( + config_file_monitor.time, + "sleep", + mocker.MagicMock(wraps=_modify_config_file), + ) + + # mock os.kill to raise SuccessExit exception + mocker.patch("os.kill", mocker.MagicMock(side_effect=_SuccessExit)) + + def test_config_file_monitor(self): + with pytest.raises(_SuccessExit): + config_file_monitor._config_file_monitor() diff --git a/tests/test_configs.py b/tests/test_configs.py new file mode 100644 index 0000000..cc91a31 --- /dev/null +++ b/tests/test_configs.py @@ -0,0 +1,154 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import os +from typing import Any + +import pytest +from pytest_mock import MockerFixture + +from otaclient_iot_logging_server.configs import ( + ConfigurableLoggingServerConfig, + load_profile_info, +) +from tests.conftest import TEST_DATA_DPATH + +AWS_PROFILE_INFO_FPATH = TEST_DATA_DPATH / "aws_profile_info.yaml" + + +@pytest.mark.parametrize( + "_mock_envs, _expected", + [ + # test#0: check default settings: + ( + {}, + { + "GREENGRASS_V1_CONFIG": "/greengrass/config/config.json", + "GREENGRASS_V2_CONFIG": "/greengrass/v2/init_config/config.yaml", + "AWS_PROFILE_INFO": "/opt/ota/iot-logger/aws_profile_info.yaml", + "LISTEN_ADDRESS": "127.0.0.1", + "LISTEN_PORT": 8083, + "UPLOAD_LOGGING_SERVER_LOGS": False, + "SERVER_LOGSTREAM_SUFFIX": "iot_logging_server", + "SERVER_LOGGING_LEVEL": "INFO", + "SERVER_LOGGING_LOG_FORMAT": "[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s", + "MAX_LOGS_BACKLOG": 4096, + "MAX_LOGS_PER_MERGE": 512, + "UPLOAD_INTERVAL": 3, + "ECU_INFO_YAML": "/boot/ota/ecu_info.yaml", + "EXIT_ON_CONFIG_FILE_CHANGED": True, + }, + ), + # test#1: frequently changed settings + ( + { + "LISTEN_ADDRESS": "172.16.1.1", + "SERVER_LOGGING_LEVEL": "ERROR", + "UPLOAD_INTERVAL": "30", + }, + { + "GREENGRASS_V1_CONFIG": "/greengrass/config/config.json", + "GREENGRASS_V2_CONFIG": "/greengrass/v2/init_config/config.yaml", + "AWS_PROFILE_INFO": "/opt/ota/iot-logger/aws_profile_info.yaml", + "LISTEN_ADDRESS": "172.16.1.1", + "LISTEN_PORT": 8083, + "UPLOAD_LOGGING_SERVER_LOGS": False, + "SERVER_LOGSTREAM_SUFFIX": "iot_logging_server", + "SERVER_LOGGING_LEVEL": "ERROR", + "SERVER_LOGGING_LOG_FORMAT": "[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s", + "MAX_LOGS_BACKLOG": 4096, + "MAX_LOGS_PER_MERGE": 512, + "UPLOAD_INTERVAL": 30, + "ECU_INFO_YAML": "/boot/ota/ecu_info.yaml", + "EXIT_ON_CONFIG_FILE_CHANGED": True, + }, + ), + # test#2: change everything + ( + { + "GREENGRASS_V1_CONFIG": "ggv1_cfg.json", + "GREENGRASS_V2_CONFIG": "ggv2_cfg.yaml", + "AWS_PROFILE_INFO": "aws_profile_info.yaml", + "LISTEN_ADDRESS": "172.16.1.1", + "LISTEN_PORT": "12345", + "UPLOAD_LOGGING_SERVER_LOGS": "true", + "SERVER_LOGSTREAM_SUFFIX": "test_logging_server", + "SERVER_LOGGING_LEVEL": "DEBUG", + "SERVER_LOGGING_LOG_FORMAT": "someformat", + "MAX_LOGS_BACKLOG": "1024", + "MAX_LOGS_PER_MERGE": "128", + "UPLOAD_INTERVAL": "10", + "ECU_INFO_YAML": "/some/where/ecu_info.yaml", + "EXIT_ON_CONFIG_FILE_CHANGED": "false", + }, + { + "GREENGRASS_V1_CONFIG": "ggv1_cfg.json", + "GREENGRASS_V2_CONFIG": "ggv2_cfg.yaml", + "AWS_PROFILE_INFO": "aws_profile_info.yaml", + "LISTEN_ADDRESS": "172.16.1.1", + "LISTEN_PORT": 12345, + "UPLOAD_LOGGING_SERVER_LOGS": True, + "SERVER_LOGSTREAM_SUFFIX": "test_logging_server", + "SERVER_LOGGING_LEVEL": "DEBUG", + "SERVER_LOGGING_LOG_FORMAT": "someformat", + "MAX_LOGS_BACKLOG": 1024, + "MAX_LOGS_PER_MERGE": 128, + "UPLOAD_INTERVAL": 10, + "ECU_INFO_YAML": "/some/where/ecu_info.yaml", + "EXIT_ON_CONFIG_FILE_CHANGED": False, + }, + ), + ], +) +def test_server_config_loading( + _mock_envs: dict[str, str], + _expected: dict[str, Any], + mocker: MockerFixture, +): + # patch environmental variables while clearing all already + mocker.patch.dict(os.environ, _mock_envs, clear=True) + # NOTE: compare by dict to prevent double import from env vars + assert _expected == ConfigurableLoggingServerConfig().model_dump() + + +@pytest.mark.parametrize( + "_in, _expected", + [ + ( + str(AWS_PROFILE_INFO_FPATH), + [ + { + "profile_name": "profile-dev", + "account_id": "012345678901", + "credential_endpoint": "abcdefghijk01.credentials.iot.region.amazonaws.com", + }, + { + "profile_name": "profile-stg", + "account_id": "012345678902", + "credential_endpoint": "abcdefghijk02.credentials.iot.region.amazonaws.com", + }, + { + "profile_name": "profile-prd", + "account_id": "012345678903", + "credential_endpoint": "abcdefghijk03.credentials.iot.region.amazonaws.com", + }, + ], + ), + ], +) +def test_load_profile_info(_in: str, _expected: dict[str, Any]): + assert load_profile_info(_in).model_dump() == _expected diff --git a/tests/test_ecu_info.py b/tests/test_ecu_info.py new file mode 100644 index 0000000..ac64e9a --- /dev/null +++ b/tests/test_ecu_info.py @@ -0,0 +1,39 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from otaclient_iot_logging_server.ecu_info import parse_ecu_info + +TESTS_DIR = Path(__file__).parent / "data" + + +@pytest.mark.parametrize( + ["ecu_info_fname", "expected_ecu_id_set"], + ( + ( + "ecu_info.yaml", + {"sub1", "sub2", "sub3", "main"}, + ), + ), +) +def test_ecu_info(ecu_info_fname: str, expected_ecu_id_set: set[str]): + ecu_info_fpath = TESTS_DIR / ecu_info_fname + assert (ecu_info_cfg := parse_ecu_info(ecu_info_fpath)) + assert ecu_info_cfg.ecu_id_set == expected_ecu_id_set diff --git a/tests/test_greengrass_config.py b/tests/test_greengrass_config.py new file mode 100644 index 0000000..46beac3 --- /dev/null +++ b/tests/test_greengrass_config.py @@ -0,0 +1,163 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging +import uuid +from dataclasses import dataclass + +import pytest +from pytest_mock import MockerFixture + +import otaclient_iot_logging_server.greengrass_config +from otaclient_iot_logging_server.greengrass_config import ( + IoTSessionConfig, + PKCS11Config, + get_profile_from_thing_name, + parse_config, + parse_v1_config, + parse_v2_config, +) +from tests.conftest import TEST_DATA_DPATH + +logger = logging.getLogger(__name__) + +MODULE = otaclient_iot_logging_server.greengrass_config.__name__ +# NOTE: AWS_PROFILE_INFO, GREENGRASS_V1_CONFIG and GREENGRASS_V2_CONFIG +# environmental variables are properly set in pyproject.toml. +# profile_info in configs.py is populated with aws_profile_info.yaml in tests/data. +# NOTE: gg_v1_cfg and gg_v2_cfg is the same, besides the thing_name, +# this will be used as evidence to check which config is used. +GG_V1_CFG_FPATH = TEST_DATA_DPATH / "gg_v1_cfg.json" +GG_V1_CFG_RAW = GG_V1_CFG_FPATH.read_text() +CFG_FROM_GG_V1 = IoTSessionConfig( + account_id="012345678901", + ca_path="/greengrass/certs/root.ca.pem", + private_key_path="/greengrass/certs/gg.private.key", + certificate_path="/greengrass/certs/gg.cert.pem", + thing_name="profile-dev-edge-ggv1-Core", + profile="profile-dev", + region="region", + aws_credential_provider_endpoint="abcdefghijk01.credentials.iot.region.amazonaws.com", +) + +GG_V2_CFG_FPATH = TEST_DATA_DPATH / "gg_v2_cfg.yaml" +GG_V2_CFG_RAW = GG_V2_CFG_FPATH.read_text() +CFG_FROM_GG_V2 = IoTSessionConfig( + account_id="012345678901", + ca_path="/greengrass/certs/root.ca.pem", + private_key_path="/greengrass/certs/gg.private.key", + certificate_path="/greengrass/certs/gg.cert.pem", + thing_name="profile-dev-edge-ggv2-Core", + profile="profile-dev", + region="region", + aws_credential_provider_endpoint="abcdefghijk01.credentials.iot.region.amazonaws.com", +) + +GG_V2_TPM2_CFG_FPATH = TEST_DATA_DPATH / "gg_v2_cfg.yaml_tpm2.0" +GG_V2_TPM2_CFG_RAW = GG_V2_TPM2_CFG_FPATH.read_text() +CFG_FROM_GG_V2_TPM2 = IoTSessionConfig( + account_id="012345678901", + ca_path="/greengrass/certs/root.ca.pem", + private_key_path="pkcs11:object=greengrass_key;type=private;pin-value=greengrass_userpin", + certificate_path="pkcs11:object=greengrass_key;type=cert;pin-value=greengrass_userpin", + thing_name="profile-dev-edge-ggv2-Core", + profile="profile-dev", + region="region", + aws_credential_provider_endpoint="abcdefghijk01.credentials.iot.region.amazonaws.com", + pkcs11_config=PKCS11Config( + pkcs11_lib="/usr/lib/x86_64-linux-gnu/pkcs11/libtpm2_pkcs11.so", + slot_id="1", + user_pin="greengrass_userpin", + ), +) + + +@pytest.mark.parametrize( + "_in, _expected", + [ + (f"thing/profile-stg-edge-{uuid.uuid1()}-Core", "profile-stg"), + (f"profile-dev-edge-{uuid.uuid1()}-Core", "profile-dev"), + ], +) +def test_get_profile_from_thing_name(_in: str, _expected: str): + assert get_profile_from_thing_name(_in) == _expected + + +# +# ------ greengrass v1 configuration ------ # +# +# NOTE: support for ggv1 tpm2.0 is not implemented. +@pytest.mark.parametrize( + "_raw_cfg, _expected", + [(GG_V1_CFG_RAW, CFG_FROM_GG_V1)], +) +def test_parse_v1_config(_raw_cfg: str, _expected: IoTSessionConfig): + assert parse_v1_config(_raw_cfg) == _expected + + +# +# ------ greengrass v2 configuration ------ # +# +@pytest.mark.parametrize( + "_raw_cfg, _expected", + [ + (GG_V2_CFG_RAW, CFG_FROM_GG_V2), + (GG_V2_TPM2_CFG_RAW, CFG_FROM_GG_V2_TPM2), + ], +) +def test_parse_v2_config(_raw_cfg: str, _expected: IoTSessionConfig): + assert parse_v2_config(_raw_cfg) == _expected + + +# +# ------ test parse_config entry point ------ # +# +@dataclass +class _ServerConfig: + GREENGRASS_V2_CONFIG: str + GREENGRASS_V1_CONFIG: str + + +class TestParseConfig: + def test_greengrass_v1_cfg_only(self, mocker: MockerFixture): + _server_cfg = _ServerConfig( + GREENGRASS_V1_CONFIG=str(GG_V1_CFG_FPATH), + GREENGRASS_V2_CONFIG="/path/not/exists", + ) + mocker.patch(f"{MODULE}.server_cfg", _server_cfg) + + assert parse_config() == CFG_FROM_GG_V1 + + def test_greengrass_v2_cfg_only(self, mocker: MockerFixture): + _server_cfg = _ServerConfig( + GREENGRASS_V1_CONFIG="/path/not/exists", + GREENGRASS_V2_CONFIG=str(GG_V2_CFG_FPATH), + ) + mocker.patch(f"{MODULE}.server_cfg", _server_cfg) + + assert parse_config() == CFG_FROM_GG_V2 + + def test_both_exist(self, mocker: MockerFixture): + """ + Greengrass V2 config should take priority. + """ + _server_cfg = _ServerConfig( + GREENGRASS_V1_CONFIG=str(GG_V1_CFG_FPATH), + GREENGRASS_V2_CONFIG=str(GG_V2_CFG_FPATH), + ) + mocker.patch(f"{MODULE}.server_cfg", _server_cfg) + assert parse_config() == CFG_FROM_GG_V2 diff --git a/tests/test_log_proxy_server.py b/tests/test_log_proxy_server.py new file mode 100644 index 0000000..9113cfa --- /dev/null +++ b/tests/test_log_proxy_server.py @@ -0,0 +1,172 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging +import os +import random +from dataclasses import dataclass +from http import HTTPStatus +from pathlib import Path +from queue import Queue +from urllib.parse import urljoin + +import aiohttp +import aiohttp.client_exceptions +import pytest +from aiohttp import web +from pytest_mock import MockerFixture + +import otaclient_iot_logging_server.log_proxy_server as log_server_module +from otaclient_iot_logging_server._common import LogsQueue +from otaclient_iot_logging_server.ecu_info import parse_ecu_info +from otaclient_iot_logging_server.log_proxy_server import LoggingPostHandler + +logger = logging.getLogger(__name__) + +MODULE = log_server_module.__name__ +TEST_DIR = Path(__file__).parent / "data" + + +@dataclass +class _ServerConfig: + """Minimum set of server_config needed for this test.""" + + LISTEN_ADDRESS: str = "127.0.0.1" + LISTEN_PORT: int = 8083 + ECU_INFO_YAML: Path = TEST_DIR / "ecu_info.yaml" + # remember to disable config file monitor + EXIT_ON_CONFIG_FILE_CHANGED: bool = False + + +_test_server_cfg = _ServerConfig() + + +@dataclass +class MessageEntry: + ecu_id: str + message: str + + +# see data/ecu_info.yaml +mocked_ECUs_list = ("main", "sub1", "sub2", "sub3") + + +def generate_random_msgs( + ecus_list: tuple[str, ...] = mocked_ECUs_list, + msg_len: int = 16, + msg_num: int = 4096, +) -> list[MessageEntry]: + _res: list[MessageEntry] = [] + for _ in range(msg_num): + _ecu, *_ = random.sample(ecus_list, 1) + _msg = os.urandom(msg_len).hex() + _res.append(MessageEntry(_ecu, _msg)) + return _res + + +class TestLogProxyServer: + + SERVER_URL = ( + f"http://{_test_server_cfg.LISTEN_ADDRESS}:{_test_server_cfg.LISTEN_PORT}/" + ) + TOTAL_MSG_NUM = 4096 + + @pytest.fixture(autouse=True) + def mock_ecu_info(self, mocker: MockerFixture): + ecu_info = parse_ecu_info(TEST_DIR / "ecu_info.yaml") + mocker.patch(f"{MODULE}.ecu_info", ecu_info) + + @pytest.fixture(autouse=True) + async def launch_server(self, mocker: MockerFixture, mock_ecu_info): + """ + See https://docs.aiohttp.org/en/stable/web_advanced.html#custom-resource-implementation + for more details. + """ + mocker.patch(f"{MODULE}.server_cfg", _test_server_cfg) + + queue: LogsQueue = Queue() + self._queue = queue + + handler = LoggingPostHandler(queue) + app = web.Application() + # mute the aiohttp server logging + aiohttp_server_logger = logging.getLogger("aiohttp") + aiohttp_server_logger.setLevel("ERROR") + # add handler to the server + app.add_routes([web.post(r"/{ecu_id}", handler.logging_post_handler)]) + # star the server + runner = web.AppRunner(app) + try: + await runner.setup() + site = web.TCPSite( + runner, _test_server_cfg.LISTEN_ADDRESS, _test_server_cfg.LISTEN_PORT + ) + await site.start() + logger.info(f"test log_proxy_server started at {self.SERVER_URL}") + yield + finally: + await runner.cleanup() + + @pytest.fixture(autouse=True) + async def client_sesion(self): + client_session = aiohttp.ClientSession( + raise_for_status=True, + timeout=aiohttp.ClientTimeout(total=0.2), # for speedup testing + ) + try: + yield client_session + finally: + await client_session.close() + + @pytest.fixture(autouse=True) + def prepare_test_data(self): + self._msgs = generate_random_msgs(msg_num=self.TOTAL_MSG_NUM) + + async def test_server(self, client_sesion: aiohttp.ClientSession): + # ------ execution ------ # + logger.info(f"sending {self.TOTAL_MSG_NUM} msgs to {self.SERVER_URL}...") + for item in self._msgs: + _ecu_id, _msg = item.ecu_id, item.message + _log_upload_endpoint_url = urljoin(self.SERVER_URL, _ecu_id) + async with client_sesion.post(_log_upload_endpoint_url, data=_msg): + pass # raise_for_status is set on session + # ------ check result ------ # + # ensure the all msgs are sent in order to the queue by the server. + logger.info("checking all the received messages...") + for item in self._msgs: + _ecu_id, _log_msg = self._queue.get_nowait() + assert _ecu_id == item.ecu_id + assert _log_msg["message"] == item.message + assert self._queue.empty() + + @pytest.mark.parametrize( + "_ecu_id, _data", + [ + # unknowned ECU's request will be dropped + ("bad_ecu_id", "valid_msg"), + # empty message will be dropped + ("main", ""), + ], + ) + async def test_reject_invalid_request( + self, _ecu_id: str, _data: str, client_sesion: aiohttp.ClientSession + ): + with pytest.raises(aiohttp.client_exceptions.ClientResponseError) as exc_info: + _log_upload_endpoint_url = urljoin(self.SERVER_URL, _ecu_id) + async with client_sesion.post(_log_upload_endpoint_url, data=_data): + pass # raise_for_status is set on session + assert exc_info.value.status == HTTPStatus.BAD_REQUEST