-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support gRPC logging endpoint #27
Conversation
class LoggingPostHandler: | ||
"""A simple aiohttp server handler that receives logs from otaclient.""" | ||
|
||
def __init__(self, queue: LogsQueue) -> None: | ||
self._queue = queue | ||
self._allowed_ecus = None | ||
|
||
if ecu_info: | ||
self._allowed_ecus = ecu_info.ecu_id_set | ||
logger.info( | ||
f"setup allowed_ecu_id from ecu_info.yaml: {ecu_info.ecu_id_set}" | ||
) | ||
else: | ||
logger.warning( | ||
"no ecu_info.yaml presented, logging upload filtering is DISABLED" | ||
) | ||
|
||
# route: POST /{ecu_id} | ||
async def logging_post_handler(self, request: Request): | ||
""" | ||
NOTE: use <ecu_id> as log_stream_suffix, each ECU has its own | ||
logging stream for uploading. | ||
""" | ||
_ecu_id = request.match_info["ecu_id"] | ||
_raw_logging = await request.text() | ||
_allowed_ecus = self._allowed_ecus | ||
# don't allow empty request or unknowned ECUs | ||
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log. | ||
if not _raw_logging or (_allowed_ecus and _ecu_id not in _allowed_ecus): | ||
return web.Response(status=HTTPStatus.BAD_REQUEST) | ||
|
||
_logging_msg = LogMessage( | ||
timestamp=int(time.time()) * 1000, # milliseconds | ||
message=_raw_logging, | ||
) | ||
# logger.debug(f"receive log from {_ecu_id}: {_logging_msg}") | ||
try: | ||
self._queue.put_nowait((_ecu_id, _logging_msg)) | ||
except Full: | ||
logger.debug(f"message dropped: {_logging_msg}") | ||
return web.Response(status=HTTPStatus.SERVICE_UNAVAILABLE) | ||
|
||
return web.Response(status=HTTPStatus.OK) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to servicer.py
.
dffca6c
to
fe4edae
Compare
@pytest.fixture(autouse=True) | ||
def prepare_test_data(self): | ||
self._msgs = generate_random_msgs(msg_num=self.TOTAL_MSG_NUM) | ||
|
||
async def test_server(self, client_sesion: aiohttp.ClientSession): | ||
async def test_http_server( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test for HTTP
loop.create_task(_http_server_launcher()) | ||
loop.create_task(_grpc_server_launcher()) | ||
loop.run_forever() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create both http and grpc servers in different ports and run in parallel.
async def put_log_http(self, request: Request) -> PutLogResponse: | ||
""" | ||
put log message from HTTP POST request. | ||
""" | ||
_ecu_id = request.match_info["ecu_id"] | ||
_message = await request.text() | ||
|
||
_code = await self.put_log(_ecu_id, _message) | ||
|
||
if _code == ErrorCode.NO_MESSAGE or _code == ErrorCode.NOT_ALLOWED_ECU_ID: | ||
_status = HTTPStatus.BAD_REQUEST | ||
elif _code == ErrorCode.SERVER_QUEUE_FULL: | ||
_status = HTTPStatus.SERVICE_UNAVAILABLE | ||
else: | ||
_status = HTTPStatus.OK | ||
|
||
return web.Response(status=_status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HTTP handler
) | ||
# logger.debug(f"receive log from {ecu_id}: {_logging_msg}") | ||
try: | ||
self._queue.put_nowait((ecu_id, _logging_msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this PR don't change the data format in the queue(ecu_id
, message
). will change it in next PR.
5ab0e2c
to
518b04f
Compare
Coverage Report
|
0a4cd5c
to
22de49b
Compare
22de49b
to
adb11d1
Compare
@pytest.mark.asyncio | ||
@pytest.fixture | ||
async def launch_grpc_server(self, mocker: MockerFixture, mock_ecu_info): | ||
mocker.patch(f"{MODULE}.server_cfg", _test_server_cfg) | ||
|
||
queue: LogsQueue = Queue() | ||
self._queue = queue | ||
|
||
servicer = OTAClientIoTLoggingServerServicer( | ||
ecu_info=self._ecu_info, | ||
queue=queue, | ||
) | ||
|
||
server = grpc.aio.server() | ||
v1_grpc.add_OTAClientIoTLoggingServiceServicer_to_server( | ||
servicer=OTAClientIoTLoggingServiceV1(servicer), server=server | ||
) | ||
server.add_insecure_port(self.SERVER_URL_GRPC) | ||
try: | ||
await server.start() | ||
yield | ||
finally: | ||
await server.stop(None) | ||
await server.wait_for_termination() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
launch grpc test server.
async def test_grpc_server_check(self, _service: str, launch_grpc_server): | ||
_req = pb2.HealthCheckRequest(service=_service) | ||
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel: | ||
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel) | ||
_response = await stub.Check(_req) | ||
assert _response.status == pb2.HealthCheckResponse.SERVING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test for Check
funciton.
async def http_put_log(self, request: Request) -> PutLogResponse: | ||
""" | ||
put log message from HTTP POST request. | ||
""" | ||
_ecu_id = request.match_info["ecu_id"] | ||
_message = await request.text() | ||
|
||
_code = await self._put_log(ecu_id=_ecu_id, message=_message) | ||
|
||
if _code == ErrorCode.NO_MESSAGE or _code == ErrorCode.NOT_ALLOWED_ECU_ID: | ||
_status = HTTPStatus.BAD_REQUEST | ||
elif _code == ErrorCode.SERVER_QUEUE_FULL: | ||
_status = HTTPStatus.SERVICE_UNAVAILABLE | ||
else: | ||
_status = HTTPStatus.OK | ||
|
||
return web.Response(status=_status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the existing HTTP handler.
src/proto_wrapper/proto_wrapper.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied proto_wrapper and its test files from ota-client.
async def test_grpc_server_put_log(self, launch_grpc_server): | ||
# ------ execution ------ # | ||
logger.info(f"sending {self.TOTAL_MSG_NUM} msgs to {self.SERVER_URL_GRPC}...") | ||
|
||
async def send_put_log_msg(item): | ||
_req = pb2.PutLogRequest( | ||
ecu_id=item.ecu_id, | ||
log_type=item.log_type, | ||
timestamp=item.timestamp, | ||
level=item.level, | ||
message=item.message, | ||
) | ||
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel: | ||
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel) | ||
_response = await stub.PutLog(_req) | ||
assert _response.code == pb2.ErrorCode.NO_FAILURE | ||
|
||
for item in self._msgs: | ||
await send_put_log_msg(item) | ||
|
||
# ------ check result ------ # | ||
# ensure the all msgs are sent in order to the queue by the server. | ||
logger.info("checking all the received messages...") | ||
for item in self._msgs: | ||
_ecu_id, _log_msg = self._queue.get_nowait() | ||
assert _ecu_id == item.ecu_id | ||
assert _log_msg["message"] == item.message | ||
assert self._queue.empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test for PutLog
.
async def test_gprc_reject_invalid_ecu_id( | ||
self, | ||
launch_grpc_server, | ||
): | ||
_req = pb2.PutLogRequest( | ||
ecu_id="bad_ecu_id", | ||
message="valid_msg", | ||
) | ||
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel: | ||
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel) | ||
_response = await stub.PutLog(_req) | ||
assert _response.code == pb2.ErrorCode.NOT_ALLOWED_ECU_ID | ||
|
||
async def test_grpc_reject_invalid_message(self, launch_grpc_server): | ||
_req = pb2.PutLogRequest( | ||
ecu_id="main", | ||
message="", | ||
) | ||
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel: | ||
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel) | ||
_response = await stub.PutLog(_req) | ||
assert _response.code == pb2.ErrorCode.NO_MESSAGE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test for PutLog
(error cases).
async def _http_server_launcher(): | ||
handler = OTAClientIoTLoggingServerServicer(ecu_info=ecu_info, queue=queue) | ||
app = web.Application() | ||
app.add_routes([web.post(r"/{ecu_id}", handler.http_put_log)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set HTTP handler, call http_put_log
when HTTP server receives requests.
servicer = OTAClientIoTLoggingServerServicer( | ||
ecu_info=ecu_info, | ||
queue=queue, | ||
) | ||
otaclient_iot_logging_service_v1 = OTAClientIoTLoggingServiceV1(servicer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set gRPC handler, will call grpc_put_log
via stub.
There are many changes, but the most important changing files are the followings.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
for more information, see https://pre-commit.ci
Please retry analysis of this Pull-Request directly on SonarQube Cloud |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR! 👍
Nice and clean architecture, LGTM!
I only have some minor comment below, please take a look.
@airkei Also, about how otaclient detects otaclient-iot-logging-server grpc support. I know that the Considering the fact that logging-server on main ECU might be launched slower than otaclient on sub ECU. How can otaclient distinguishes logging-server is not up or logging-server doesn't have grpc support? |
|
Good question. I think there are two options.
PROS: we will not lose any message from sub-ecu
PROS: don't rely on HTTP for GRPC communication, sub-otaclient don't wait for main-logger. I prefer Option.2, it doesn't break the existing sequence(dependency) between otaclient and logger. Anyway, either option will not depends on the logger implementation(this PR). |
The initial log needs to be uploaded(as the initial logs contain important information like otaclient version and bootloader controller startup logs), so option1 is preferred I think. 🙏
Yes... But I am not expecting we can get rid of it in the near future... considering even otaclient v3.5.1 has many deployments in the production. It will be a long progress 😢 ( By the way, due to the mismatch of startup time between main ECU iot-logger and sub ECU's otaclient, currently in most cases we will lost the initial logs from sub ECU 😢 , as otaclient doesn't implement retry mechanism on log pushing, let's resolve this later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
OK, in that case, let's proceed based on Option.1 :) |
Why
https://tier4.atlassian.net/browse/RT4-13461
This PR support gRPC interface in addition to the existing HTTP interface.
This PR is not breaking change, this PR doesn't support metrics yet(will do in next PR).
What
gRPC server will be created in addition to the existing HTTP server. They work in parallel in the different ports.
LISTEN_PORT
for HTTPLISTEN_PORT_GRPC
for gRPCOnce the gRPC server receives the request, it puts same format data as the existing to the queue, then a thread receives the data and put the logs to cloudwatch. This PR doesn't change the data format in the queue.
By following ota-client, it converts the parameters to instance variables then handle it in the servicer.
Test
tests/test_log_proxy_server.py
tests are passed.