Skip to content

Commit

Permalink
add HTTP endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
airkei committed Jan 31, 2025
1 parent 139e0dc commit 5ab0e2c
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 32 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The behaviors of the iot_logging_server can be configured with the following env
| ECU_INFO_YAML | `/boot/ota/ecu_info.yaml` | The location of ecu_info.yaml config file. iot-logger server will parse the config file and only process logs sending from known ECUs.|
| LISTEN_ADDRESS | `127.0.0.1` | The IP address iot-logger server listen on. By default only receive logs from local machine. |
| LISTEN_PORT | `8083` | |
| LISTEN_PORT_GRPC | `8084` | |
| UPLOAD_LOGGING_SERVER_LOGS | `false` | Whether to upload the logs from server itself to cloudwatchlogs. |
| SERVER_LOGSTREAM_SUFFIX | `iot_logging_server` | log_stream suffix for local server logs on cloudwatchlogs if uploaded. |
| SERVER_LOGGING_LEVEL | `INFO` | The logging level of the server itself. |
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dynamic = [
"version",
]
dependencies = [
"aiohttp>=3.10.2,<3.11",
"awsiot-credentialhelper>=0.6,<0.7",
"boto3>=1.34.35,<1.35",
"botocore==1.34.35,<1.35",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Automatically generated from pyproject.toml by gen_requirements_txt.py script.
# DO NOT EDIT! Only for reference use.
aiohttp>=3.10.2,<3.11
awsiot-credentialhelper>=0.6,<0.7
boto3>=1.34.35,<1.35
botocore==1.34.35,<1.35
Expand Down
5 changes: 4 additions & 1 deletion src/otaclient_iot_logging_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def main() -> None:
)

root_logger.info(
f"launching iot_logging_server({__version__}) at http://{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT}"
f"launching HTTP iot_logging_server({__version__}) at http://{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT}"
)
root_logger.info(
f"launching gRPC iot_logging_server({__version__}) at http://{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT_GRPC}"
)
root_logger.info(f"iot_logging_server config: \n{server_cfg}")
# ------ launch aws cloudwatch client ------ #
Expand Down
1 change: 1 addition & 0 deletions src/otaclient_iot_logging_server/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ConfigurableLoggingServerConfig(BaseSettings):

LISTEN_ADDRESS: str = "127.0.0.1"
LISTEN_PORT: int = 8083
LISTEN_PORT_GRPC: int = 8084
UPLOAD_LOGGING_SERVER_LOGS: bool = False
SERVER_LOGSTREAM_SUFFIX: str = "iot_logging_server"
SERVER_LOGGING_LEVEL: _LoggingLevelName = "INFO"
Expand Down
22 changes: 19 additions & 3 deletions src/otaclient_iot_logging_server/log_proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from concurrent.futures import ThreadPoolExecutor

import grpc.aio
from aiohttp import web

from otaclient_iot_logging_server._common import LogsQueue
from otaclient_iot_logging_server._sd_notify import (
Expand All @@ -29,11 +30,11 @@
)
from otaclient_iot_logging_server.configs import server_cfg
from otaclient_iot_logging_server.ecu_info import ecu_info
from otaclient_iot_logging_server.servicer import OTAClientIoTLoggingServerServicer
from otaclient_iot_logging_server.v1 import (
otaclient_iot_logging_server_v1_pb2_grpc as v1_grpc,
)
from otaclient_iot_logging_server.v1.api_stub import OtaClientIoTLoggingServiceV1
from otaclient_iot_logging_server.v1.servicer import OTAClientIoTLoggingServerServicer

logger = logging.getLogger(__name__)

Expand All @@ -57,6 +58,20 @@ def launch_server(queue: LogsQueue) -> None:
READY_MSG,
)

async def _http_server_launcher():
handler = OTAClientIoTLoggingServerServicer(ecu_info=ecu_info, queue=queue)
app = web.Application()
app.add_routes([web.post(r"/{ecu_id}", handler.put_log_http)])

# typing: run_app is a NoReturn method, unless received signal
web.run_app(
app,
host=server_cfg.LISTEN_ADDRESS,
port=server_cfg.LISTEN_PORT,
loop=loop,
) # type: ignore
pass

async def _grpc_server_launcher():
thread_pool = ThreadPoolExecutor(
thread_name_prefix="otaclient_iot_logging_server",
Expand All @@ -72,10 +87,10 @@ async def _grpc_server_launcher():
server=server, servicer=otaclient_iot_logging_service_v1
)
server.add_insecure_port(
f"{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT}"
f"{server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT_GRPC}"
)
logger.info(
f"launch grpc server at {server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT}"
f"launch grpc server at {server_cfg.LISTEN_ADDRESS}:{server_cfg.LISTEN_PORT_GRPC}"
)

await server.start()
Expand All @@ -85,5 +100,6 @@ async def _grpc_server_launcher():
await server.stop(1)
thread_pool.shutdown(wait=True)

loop.create_task(_http_server_launcher())
loop.create_task(_grpc_server_launcher())
loop.run_forever()
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

import logging
import time
from http import HTTPStatus
from queue import Full

from aiohttp import web
from aiohttp.web import Request

from otaclient_iot_logging_server._common import LogMessage, LogsQueue
from otaclient_iot_logging_server.ecu_info import ECUInfo
from otaclient_iot_logging_server.v1.types import (
Expand Down Expand Up @@ -52,31 +56,57 @@ def __init__(
"no ecu_info.yaml presented, logging upload filtering is DISABLED"
)

async def put_log(self, request: PutLogRequest) -> PutLogResponse:
async def put_log(self, ecu_id, message):
"""
NOTE: use <ecu_id> as log_stream_suffix, each ECU has its own
logging stream for uploading.
Put log message into queue.
"""
_ecu_id = request.ecu_id
_timestamp = (int(time.time()) * 1000,) # milliseconds
_message = request.message
# don't allow empty message request
if not _message:
return PutLogResponse(code=ErrorCode.NO_MESSAGE)
if not message:
return ErrorCode.NO_MESSAGE
# don't allow unknowned ECUs
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log.
if self._allowed_ecus and _ecu_id not in self._allowed_ecus:
return PutLogResponse(code=ErrorCode.NOT_ALLOWED_ECU_ID)
if self._allowed_ecus and ecu_id not in self._allowed_ecus:
return ErrorCode.NOT_ALLOWED_ECU_ID

_timestamp = (int(time.time()) * 1000,) # milliseconds
_logging_msg = LogMessage(
timestamp=_timestamp,
message=_message,
message=message,
)
# logger.debug(f"receive log from {_ecu_id}: {_logging_msg}")
# logger.debug(f"receive log from {ecu_id}: {_logging_msg}")
try:
self._queue.put_nowait((_ecu_id, _logging_msg))
self._queue.put_nowait((ecu_id, _logging_msg))
except Full:
logger.debug(f"message dropped: {_logging_msg}")
return PutLogResponse(code=ErrorCode.SERVER_QUEUE_FULL)
return ErrorCode.SERVER_QUEUE_FULL

return ErrorCode.NO_FAILURE

async def put_log_http(self, request: Request) -> PutLogResponse:
"""
put log message from HTTP POST request.
"""
_ecu_id = request.match_info["ecu_id"]
_message = await request.text()

_code = await self.put_log(_ecu_id, _message)

if _code == ErrorCode.NO_MESSAGE or _code == ErrorCode.NOT_ALLOWED_ECU_ID:
_status = HTTPStatus.BAD_REQUEST
elif _code == ErrorCode.SERVER_QUEUE_FULL:
_status = HTTPStatus.SERVICE_UNAVAILABLE
else:
_status = HTTPStatus.OK

return web.Response(status=_status)

async def put_log_grpc(self, request: PutLogRequest) -> PutLogResponse:
"""
put log message from gRPC request
"""
_ecu_id = request.ecu_id
_message = request.message

return PutLogResponse(code=ErrorCode.NO_FAILURE)
_code = await self.put_log(_ecu_id, _message)
return PutLogResponse(code=_code)
2 changes: 1 addition & 1 deletion src/otaclient_iot_logging_server/v1/api_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ def __init__(self, otaclient_iot_logging_server_stub: Any):
self._stub = otaclient_iot_logging_server_stub

async def PutLog(self, request: pb2.PutLogRequest, context) -> pb2.PutLogResponse:
response = await self._stub.put_log(types.PutLogRequest.convert(request))
response = await self._stub.put_log_grpc(types.PutLogRequest.convert(request))
return response.export_pb()
7 changes: 6 additions & 1 deletion tests/test__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class _ServerCfg:
SERVER_LOGSTREAM_SUFFIX: str = "test_suffix"
LISTEN_ADDRESS: str = "172.16.1.1"
LISTEN_PORT: int = 1234
LISTEN_PORT_GRPC: int = 4321
MAX_LOGS_PER_MERGE: int = 123
MAX_LOGS_BACKLOG: int = 1234
UPLOAD_INTERVAL: int = 12
Expand Down Expand Up @@ -83,8 +84,12 @@ def test_main(
_launch_server_mock.assert_called_once()

# check __main__.main source code for more details
assert (
caplog.records[-3].msg
== f"launching HTTP iot_logging_server({_version}) at http://{_in_server_cfg.LISTEN_ADDRESS}:{_in_server_cfg.LISTEN_PORT}"
)
assert (
caplog.records[-2].msg
== f"launching iot_logging_server({_version}) at http://{_in_server_cfg.LISTEN_ADDRESS}:{_in_server_cfg.LISTEN_PORT}"
== f"launching gRPC iot_logging_server({_version}) at http://{_in_server_cfg.LISTEN_ADDRESS}:{_in_server_cfg.LISTEN_PORT_GRPC}"
)
assert (caplog.records[-1].msg) == f"iot_logging_server config: \n{_in_server_cfg}"
4 changes: 4 additions & 0 deletions tests/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"AWS_PROFILE_INFO": "/opt/ota/iot-logger/aws_profile_info.yaml",
"LISTEN_ADDRESS": "127.0.0.1",
"LISTEN_PORT": 8083,
"LISTEN_PORT_GRPC": 8084,
"UPLOAD_LOGGING_SERVER_LOGS": False,
"SERVER_LOGSTREAM_SUFFIX": "iot_logging_server",
"SERVER_LOGGING_LEVEL": "INFO",
Expand All @@ -66,6 +67,7 @@
"AWS_PROFILE_INFO": "/opt/ota/iot-logger/aws_profile_info.yaml",
"LISTEN_ADDRESS": "172.16.1.1",
"LISTEN_PORT": 8083,
"LISTEN_PORT_GRPC": 8084,
"UPLOAD_LOGGING_SERVER_LOGS": False,
"SERVER_LOGSTREAM_SUFFIX": "iot_logging_server",
"SERVER_LOGGING_LEVEL": "ERROR",
Expand All @@ -85,6 +87,7 @@
"AWS_PROFILE_INFO": "aws_profile_info.yaml",
"LISTEN_ADDRESS": "172.16.1.1",
"LISTEN_PORT": "12345",
"LISTEN_PORT_GRPC": "54321",
"UPLOAD_LOGGING_SERVER_LOGS": "true",
"SERVER_LOGSTREAM_SUFFIX": "test_logging_server",
"SERVER_LOGGING_LEVEL": "DEBUG",
Expand All @@ -101,6 +104,7 @@
"AWS_PROFILE_INFO": "aws_profile_info.yaml",
"LISTEN_ADDRESS": "172.16.1.1",
"LISTEN_PORT": 12345,
"LISTEN_PORT_GRPC": 54321,
"UPLOAD_LOGGING_SERVER_LOGS": True,
"SERVER_LOGSTREAM_SUFFIX": "test_logging_server",
"SERVER_LOGGING_LEVEL": "DEBUG",
Expand Down
Loading

0 comments on commit 5ab0e2c

Please sign in to comment.