Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress ConnectionResetError #7404

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/tribler/core/components/restapi/rest/events_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def passthrough(x):
notifications.report_config_error,
]


MessageDict = Dict[str, Any]


Expand Down Expand Up @@ -143,8 +142,17 @@ async def _write_data(self, message: MessageDict):
self._logger.exception(e)
return

processed_responses = []
for response in self.events_responses:
await response.write(message_bytes)
try:
await response.write(message_bytes)
# by creating the list with processed responses we want to remove responses with
# ConnectionResetError from `self.events_responses`:
processed_responses.append(response)
except ConnectionResetError as e:
# The connection was closed by GUI
self._logger.warning(e, exc_info=True)
self.events_responses = processed_responses

# An exception has occurred in Tribler. The event includes a readable
# string of the error and a Sentry event.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import json
import logging
from asyncio import CancelledError, Event, create_task
from contextlib import suppress
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import pytest
from _pytest.logging import LogCaptureFixture
from aiohttp import ClientSession

from tribler.core import notifications
Expand All @@ -20,7 +22,7 @@
messages_to_wait_for = set()


# pylint: disable=redefined-outer-name
# pylint: disable=redefined-outer-name, protected-access

@pytest.fixture(name='api_port')
def fixture_api_port(free_port):
Expand Down Expand Up @@ -194,3 +196,19 @@ async def test_should_skip_message(events_endpoint):
with patch.object(events_endpoint, '_shutdown', new=True):
# But, if it is in shutdown state, it should always skip a message
assert events_endpoint.should_skip_message(message)


async def test_write_data(events_endpoint: EventsEndpoint, caplog: LogCaptureFixture):
# Test that write_data will call write methods for all responses, even if some of them could raise
# a ConnectionResetError exception.

bad_response = AsyncMock(write=AsyncMock(side_effect=ConnectionResetError))
good_response = AsyncMock()

events_endpoint.events_responses = [bad_response, good_response]
await events_endpoint._write_data({'any': 'data'})

assert bad_response.write.called
assert good_response.write.called
last_log_record = caplog.records[-1]
assert last_log_record.levelno == logging.WARNING