Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support gRPC logging endpoint #27

Merged
merged 11 commits into from
Feb 21, 2025
Merged

Conversation

airkei
Copy link
Contributor

@airkei airkei commented Jan 8, 2025

Why

https://tier4.atlassian.net/browse/RT4-13461
This PR support gRPC interface in addition to the existing HTTP interface.
This PR is not breaking change, this PR doesn't support metrics yet(will do in next PR).

What

gRPC server will be created in addition to the existing HTTP server. They work in parallel in the different ports.

  • LISTEN_PORT for HTTP
  • LISTEN_PORT_GRPC for gRPC

Once the gRPC server receives the request, it puts same format data as the existing to the queue, then a thread receives the data and put the logs to cloudwatch. This PR doesn't change the data format in the queue.
By following ota-client, it converts the parameters to instance variables then handle it in the servicer.

Test

  • Verified that the existing pytest and new tests/test_log_proxy_server.py tests are passed.
  • Verified that log was output into CloudWatch as expected from ECU(VM).

@airkei airkei changed the title feat!: migrate the interface from REST to gRPC DNM: feat!: migrate the interface from REST to gRPC Jan 8, 2025
Comment on lines -39 to -83
class LoggingPostHandler:
"""A simple aiohttp server handler that receives logs from otaclient."""

def __init__(self, queue: LogsQueue) -> None:
self._queue = queue
self._allowed_ecus = None

if ecu_info:
self._allowed_ecus = ecu_info.ecu_id_set
logger.info(
f"setup allowed_ecu_id from ecu_info.yaml: {ecu_info.ecu_id_set}"
)
else:
logger.warning(
"no ecu_info.yaml presented, logging upload filtering is DISABLED"
)

# route: POST /{ecu_id}
async def logging_post_handler(self, request: Request):
"""
NOTE: use <ecu_id> as log_stream_suffix, each ECU has its own
logging stream for uploading.
"""
_ecu_id = request.match_info["ecu_id"]
_raw_logging = await request.text()
_allowed_ecus = self._allowed_ecus
# don't allow empty request or unknowned ECUs
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log.
if not _raw_logging or (_allowed_ecus and _ecu_id not in _allowed_ecus):
return web.Response(status=HTTPStatus.BAD_REQUEST)

_logging_msg = LogMessage(
timestamp=int(time.time()) * 1000, # milliseconds
message=_raw_logging,
)
# logger.debug(f"receive log from {_ecu_id}: {_logging_msg}")
try:
self._queue.put_nowait((_ecu_id, _logging_msg))
except Full:
logger.debug(f"message dropped: {_logging_msg}")
return web.Response(status=HTTPStatus.SERVICE_UNAVAILABLE)

return web.Response(status=HTTPStatus.OK)


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to servicer.py.

@airkei airkei force-pushed the feat!/migrate_from_rest_to_grpc branch from dffca6c to fe4edae Compare January 16, 2025 02:31
@pytest.fixture(autouse=True)
def prepare_test_data(self):
self._msgs = generate_random_msgs(msg_num=self.TOTAL_MSG_NUM)

async def test_server(self, client_sesion: aiohttp.ClientSession):
async def test_http_server(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test for HTTP

Comment on lines 103 to 105
loop.create_task(_http_server_launcher())
loop.create_task(_grpc_server_launcher())
loop.run_forever()
Copy link
Contributor Author

@airkei airkei Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create both http and grpc servers in different ports and run in parallel.

@airkei airkei changed the title DNM: feat!: migrate the interface from REST to gRPC DNM: feat: migrate the interface from REST to gRPC Jan 31, 2025
Comment on lines 86 to 102
async def put_log_http(self, request: Request) -> PutLogResponse:
"""
put log message from HTTP POST request.
"""
_ecu_id = request.match_info["ecu_id"]
_message = await request.text()

_code = await self.put_log(_ecu_id, _message)

if _code == ErrorCode.NO_MESSAGE or _code == ErrorCode.NOT_ALLOWED_ECU_ID:
_status = HTTPStatus.BAD_REQUEST
elif _code == ErrorCode.SERVER_QUEUE_FULL:
_status = HTTPStatus.SERVICE_UNAVAILABLE
else:
_status = HTTPStatus.OK

return web.Response(status=_status)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP handler

)
# logger.debug(f"receive log from {ecu_id}: {_logging_msg}")
try:
self._queue.put_nowait((ecu_id, _logging_msg))
Copy link
Contributor Author

@airkei airkei Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR don't change the data format in the queue(ecu_id, message). will change it in next PR.

@airkei airkei changed the title DNM: feat: migrate the interface from REST to gRPC DNM: feat: support gRPC endpoint. Jan 31, 2025
@airkei airkei force-pushed the feat!/migrate_from_rest_to_grpc branch from 5ab0e2c to 518b04f Compare February 10, 2025 01:06
@airkei airkei changed the title DNM: feat: support gRPC endpoint. feat: support gRPC endpoint. Feb 10, 2025
Copy link
Contributor

github-actions bot commented Feb 10, 2025

Coverage

Coverage Report
FileStmtsMissCoverMissing
src/otaclient_iot_logging_server
   __init__.py30100% 
   __main__.py19194%52
   _common.py150100% 
   _log_setting.py271062%63, 65–66, 68–69, 73–74, 77–78, 80
   _sd_notify.py33875%42, 52, 57–59, 65–67
   _utils.py54296%73, 137
   _version.py80100% 
   aws_iot_logger.py1015545%65–67, 69–72, 75, 81–87, 90–92, 95–100, 104–107, 111–113, 116–118, 121–126, 139, 145–147, 149–153, 157, 197, 204–207
   boto3_session.py35974%50, 58–59, 61, 76–77, 81, 83, 91
   config_file_monitor.py44686%64–66, 83–85
   configs.py46197%75
   ecu_info.py37197%75
   greengrass_config.py97594%155, 266–269
   log_proxy_server.py482939%46–47, 49–51, 54–56, 59–60, 64, 67, 69–70, 73, 76, 80–82, 84–85, 89–90, 97–98, 100–101, 106, 112
   servicer.py53590%58, 90–92, 108
src/otaclient_iot_logging_server/v1
   __init__.py10100% 
   _types.py470100% 
   api_stub.py140100% 
TOTAL68213280% 

Tests Skipped Failures Errors Time
53 0 💤 0 ❌ 0 🔥 16.966s ⏱️

@airkei airkei force-pushed the feat!/migrate_from_rest_to_grpc branch 3 times, most recently from 0a4cd5c to 22de49b Compare February 10, 2025 04:26
@airkei airkei force-pushed the feat!/migrate_from_rest_to_grpc branch from 22de49b to adb11d1 Compare February 10, 2025 04:31
Comment on lines +156 to +180
@pytest.mark.asyncio
@pytest.fixture
async def launch_grpc_server(self, mocker: MockerFixture, mock_ecu_info):
mocker.patch(f"{MODULE}.server_cfg", _test_server_cfg)

queue: LogsQueue = Queue()
self._queue = queue

servicer = OTAClientIoTLoggingServerServicer(
ecu_info=self._ecu_info,
queue=queue,
)

server = grpc.aio.server()
v1_grpc.add_OTAClientIoTLoggingServiceServicer_to_server(
servicer=OTAClientIoTLoggingServiceV1(servicer), server=server
)
server.add_insecure_port(self.SERVER_URL_GRPC)
try:
await server.start()
yield
finally:
await server.stop(None)
await server.wait_for_termination()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

launch grpc test server.

Comment on lines +232 to +237
async def test_grpc_server_check(self, _service: str, launch_grpc_server):
_req = pb2.HealthCheckRequest(service=_service)
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel:
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel)
_response = await stub.Check(_req)
assert _response.status == pb2.HealthCheckResponse.SERVING
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test for Check funciton.

Comment on lines +96 to +112
async def http_put_log(self, request: Request) -> PutLogResponse:
"""
put log message from HTTP POST request.
"""
_ecu_id = request.match_info["ecu_id"]
_message = await request.text()

_code = await self._put_log(ecu_id=_ecu_id, message=_message)

if _code == ErrorCode.NO_MESSAGE or _code == ErrorCode.NOT_ALLOWED_ECU_ID:
_status = HTTPStatus.BAD_REQUEST
elif _code == ErrorCode.SERVER_QUEUE_FULL:
_status = HTTPStatus.SERVICE_UNAVAILABLE
else:
_status = HTTPStatus.OK

return web.Response(status=_status)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing HTTP handler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied proto_wrapper and its test files from ota-client.

Comment on lines +239 to +266
async def test_grpc_server_put_log(self, launch_grpc_server):
# ------ execution ------ #
logger.info(f"sending {self.TOTAL_MSG_NUM} msgs to {self.SERVER_URL_GRPC}...")

async def send_put_log_msg(item):
_req = pb2.PutLogRequest(
ecu_id=item.ecu_id,
log_type=item.log_type,
timestamp=item.timestamp,
level=item.level,
message=item.message,
)
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel:
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel)
_response = await stub.PutLog(_req)
assert _response.code == pb2.ErrorCode.NO_FAILURE

for item in self._msgs:
await send_put_log_msg(item)

# ------ check result ------ #
# ensure the all msgs are sent in order to the queue by the server.
logger.info("checking all the received messages...")
for item in self._msgs:
_ecu_id, _log_msg = self._queue.get_nowait()
assert _ecu_id == item.ecu_id
assert _log_msg["message"] == item.message
assert self._queue.empty()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test for PutLog.

Comment on lines +268 to +289
async def test_gprc_reject_invalid_ecu_id(
self,
launch_grpc_server,
):
_req = pb2.PutLogRequest(
ecu_id="bad_ecu_id",
message="valid_msg",
)
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel:
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel)
_response = await stub.PutLog(_req)
assert _response.code == pb2.ErrorCode.NOT_ALLOWED_ECU_ID

async def test_grpc_reject_invalid_message(self, launch_grpc_server):
_req = pb2.PutLogRequest(
ecu_id="main",
message="",
)
async with grpc.aio.insecure_channel(self.SERVER_URL_GRPC) as channel:
stub = v1_grpc.OTAClientIoTLoggingServiceStub(channel)
_response = await stub.PutLog(_req)
assert _response.code == pb2.ErrorCode.NO_MESSAGE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test for PutLog(error cases).

@airkei airkei changed the title feat: support gRPC endpoint. feat: support gRPC endpoint Feb 10, 2025
Comment on lines 61 to 64
async def _http_server_launcher():
handler = OTAClientIoTLoggingServerServicer(ecu_info=ecu_info, queue=queue)
app = web.Application()
app.add_routes([web.post(r"/{ecu_id}", handler.http_put_log)])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set HTTP handler, call http_put_log when HTTP server receives requests.

Comment on lines 79 to 83
servicer = OTAClientIoTLoggingServerServicer(
ecu_info=ecu_info,
queue=queue,
)
otaclient_iot_logging_service_v1 = OTAClientIoTLoggingServiceV1(servicer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set gRPC handler, will call grpc_put_log via stub.

@airkei airkei changed the title feat: support gRPC endpoint feat: support gRPC logging endpoint Feb 10, 2025
@airkei airkei marked this pull request as ready for review February 10, 2025 04:45
@airkei airkei requested a review from a team as a code owner February 10, 2025 04:45
@airkei airkei added the feature label Feb 10, 2025
@airkei
Copy link
Contributor Author

airkei commented Feb 11, 2025

There are many changes, but the most important changing files are the followings.

  • log_proxy_server.py
  • servicer.py

Zhenfeng-Sun
Zhenfeng-Sun previously approved these changes Feb 14, 2025
Copy link

@Zhenfeng-Sun Zhenfeng-Sun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

Copy link

Please retry analysis of this Pull-Request directly on SonarQube Cloud

@airkei airkei requested a review from Zhenfeng-Sun February 19, 2025 01:20
@airkei airkei self-assigned this Feb 19, 2025
Copy link
Member

@Bodong-Yang Bodong-Yang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR! 👍

Nice and clean architecture, LGTM!
I only have some minor comment below, please take a look.

@Bodong-Yang
Copy link
Member

Bodong-Yang commented Feb 20, 2025

@airkei Also, about how otaclient detects otaclient-iot-logging-server grpc support.

I know that the Check grpc API has been added, and otaclient can use this API to check whether the otaclient-iot-logging-server is up or not.
But I am not sure this API can be used to let otaclient detect whether logging-server supports grpc or not.

Considering the fact that logging-server on main ECU might be launched slower than otaclient on sub ECU. How can otaclient distinguishes logging-server is not up or logging-server doesn't have grpc support?

@airkei
Copy link
Contributor Author

airkei commented Feb 21, 2025

@airkei Also, about how otaclient detects otaclient-iot-logging-server grpc support.
I know that the Check grpc API has been added, and otaclient can use this API to check whether the otaclient-iot-logging-server is up or not.
But I am not sure this API can be used to let otaclient detect whether logging-server supports grpc or not.
Considering the fact that logging-server on main ECU might be launched slower than otaclient on sub ECU. How can otaclient distinguishes logging-server is not up or logging-server doesn't have grpc support?

Good question. I think there are two options.

  1. Add new synchronization schema from sub-otaclient to main-logger via REST
    1. when sub-otaclient wakes up, sub-otaclient send the empty message to main-logger(e.g: curl -X POST http://127.0.0.1:8083/autoware) until it success
    2. sub-otaclient wait for main-logger wake up or timeout
    3. once sub-otaclient detects the wake up of main-logger, judge if GRPC is supported or not by sending Check

PROS: we will not lose any message from sub-ecu
CONS:
- we still need to depend on HTTP for GRPC logging.
- sub-otaclient[critical application] need to wait for main-logging[non-critical application] wake up.

  1. check grpc supports by polling
    1. sub-otaclient send gRPC Check message with a constant interval like 1 min until it succeeds or timeout(10 min). Until it success, use the existing HTTP for logging

PROS: don't rely on HTTP for GRPC communication, sub-otaclient don't wait for main-logger.
CONS: Check will be continued to send by sub-ecu until main-logger wakeup or timeout.

I prefer Option.2, it doesn't break the existing sequence(dependency) between otaclient and logger.
But currently, the initial log from sub-ecu might be lost. If we think it is issue, we can adopt Option.1.

Anyway, either option will not depends on the logger implementation(this PR).

@airkei airkei requested a review from Bodong-Yang February 21, 2025 00:31
@Bodong-Yang
Copy link
Member

Bodong-Yang commented Feb 21, 2025

But currently, the initial log from sub-ecu might be lost. If we think it is issue, we can adopt Option.1.

The initial log needs to be uploaded(as the initial logs contain important information like otaclient version and bootloader controller startup logs), so option1 is preferred I think. 🙏

  • we still need to depend on HTTP for GRPC logging.

Yes... But I am not expecting we can get rid of it in the near future... considering even otaclient v3.5.1 has many deployments in the production. It will be a long progress 😢

( By the way, due to the mismatch of startup time between main ECU iot-logger and sub ECU's otaclient, currently in most cases we will lost the initial logs from sub ECU 😢 , as otaclient doesn't implement retry mechanism on log pushing, let's resolve this later.

Copy link
Member

@Bodong-Yang Bodong-Yang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

@airkei
Copy link
Contributor Author

airkei commented Feb 21, 2025

The initial log needs to be uploaded(as the initial logs contain important information like otaclient version and bootloader controller startup logs), so option1 is preferred I think. 🙏

OK, in that case, let's proceed based on Option.1 :)

@airkei airkei merged commit d0318df into main Feb 21, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants