-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlog_proxy_server.py
111 lines (91 loc) · 3.59 KB
/
log_proxy_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import logging
import time
from http import HTTPStatus
from queue import Full, Queue
from aiohttp import web
from aiohttp.web import Request
from otaclient_iot_logging_server._common import LogMessage, LogsQueue
from otaclient_iot_logging_server._sd_notify import (
READY_MSG,
sd_notify,
sd_notify_enabled,
)
from otaclient_iot_logging_server.configs import server_cfg
from otaclient_iot_logging_server.ecu_info import ecu_info
logger = logging.getLogger(__name__)
class LoggingPostHandler:
"""A simple aiohttp server handler that receives logs from otaclient."""
def __init__(self, queue: LogsQueue) -> None:
self._queue = queue
self._allowed_ecus = None
if ecu_info:
self._allowed_ecus = ecu_info.ecu_id_set
logger.info(
f"setup allowed_ecu_id from ecu_info.yaml: {ecu_info.ecu_id_set}"
)
else:
logger.warning(
"no ecu_info.yaml presented, logging upload filtering is DISABLED"
)
# route: POST /{ecu_id}
async def logging_post_handler(self, request: Request):
"""
NOTE: use <ecu_id> as log_stream_suffix, each ECU has its own
logging stream for uploading.
"""
_ecu_id = request.match_info["ecu_id"]
_raw_logging = await request.text()
_allowed_ecus = self._allowed_ecus
# don't allow empty request or unknowned ECUs
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log.
if not _raw_logging or (_allowed_ecus and _ecu_id not in _allowed_ecus):
return web.Response(status=HTTPStatus.BAD_REQUEST)
_logging_msg = LogMessage(
timestamp=int(time.time()) * 1000, # milliseconds
message=_raw_logging,
)
# logger.debug(f"receive log from {_ecu_id}: {_logging_msg}")
try:
self._queue.put_nowait((_ecu_id, _logging_msg))
except Full:
logger.debug(f"message dropped: {_logging_msg}")
return web.Response(status=HTTPStatus.SERVICE_UNAVAILABLE)
return web.Response(status=HTTPStatus.OK)
WAIT_BEFORE_SEND_READY_MSG = 2 # seconds
def launch_server(queue: Queue[tuple[str, LogMessage]]) -> None:
handler = LoggingPostHandler(queue=queue)
app = web.Application()
app.add_routes([web.post(r"/{ecu_id}", handler.logging_post_handler)])
loop = asyncio.new_event_loop()
if sd_notify_enabled():
logger.info(
"otaclient-logger service is configured to send ready msg to systemd, "
f"wait for {WAIT_BEFORE_SEND_READY_MSG} seconds for the server starting up ..."
)
loop.call_later(
WAIT_BEFORE_SEND_READY_MSG,
sd_notify,
READY_MSG,
)
# typing: run_app is a NoReturn method, unless received signal
web.run_app(
app,
host=server_cfg.LISTEN_ADDRESS,
port=server_cfg.LISTEN_PORT,
loop=loop,
) # type: ignore