Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove manual deletion of channel resources or objects #5633

Merged
merged 6 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/concepts/flow/flow-args.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `log_config` | The config name or the absolute path to the YAML config file of the logger used in this object. | `string` | `default` |
| `quiet` | If set, then no log will be emitted from this object. | `boolean` | `False` |
| `quiet_error` | If set, then exception stack information will not be added to the log | `boolean` | `False` |
| `suppress_root_logging` | If set, then no root handlers will be suppressed from logging. | `boolean` | `False` |
| `uses` | The YAML path represents a flow. It can be either a local file path or a URL. | `string` | `None` |
| `reload` | If set, auto-reloading on file changes is enabled: the Flow will restart while blocked if YAML configuration source is changed. This also applies apply to underlying Executors, if their source code or YAML configuration has changed. | `boolean` | `False` |
| `env` | The map of environment variables that are available inside runtime | `object` | `None` |
Expand Down
3 changes: 3 additions & 0 deletions jina/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def Client(
prefetch: Optional[int] = 1000,
protocol: Optional[Union[str, List[str]]] = 'GRPC',
proxy: Optional[bool] = False,
suppress_root_logging: Optional[bool] = False,
tls: Optional[bool] = False,
traces_exporter_host: Optional[str] = None,
traces_exporter_port: Optional[int] = None,
Expand Down Expand Up @@ -55,6 +56,7 @@ def Client(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
:param tls: If set, connect to gateway using tls encryption
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
:param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent.
Expand Down Expand Up @@ -109,6 +111,7 @@ def Client(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
:param tls: If set, connect to gateway using tls encryption
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
:param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent.
Expand Down
6 changes: 6 additions & 0 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def __init__(
prefetch: Optional[int] = 1000,
protocol: Optional[Union[str, List[str]]] = 'GRPC',
proxy: Optional[bool] = False,
suppress_root_logging: Optional[bool] = False,
tls: Optional[bool] = False,
traces_exporter_host: Optional[str] = None,
traces_exporter_port: Optional[int] = None,
Expand All @@ -145,6 +146,7 @@ def __init__(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
:param tls: If set, connect to gateway using tls encryption
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
:param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent.
Expand Down Expand Up @@ -313,6 +315,7 @@ def __init__(
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
reload: Optional[bool] = False,
suppress_root_logging: Optional[bool] = False,
uses: Optional[str] = None,
workspace: Optional[str] = None,
**kwargs,
Expand All @@ -336,6 +339,7 @@ def __init__(
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param reload: If set, auto-reloading on file changes is enabled: the Flow will restart while blocked if YAML configuration source is changed. This also applies apply to underlying Executors, if their source code or YAML configuration has changed.
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
:param uses: The YAML path represents a flow. It can be either a local file path or a URL.
:param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`.

Expand Down Expand Up @@ -395,6 +399,7 @@ def __init__(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
:param tls: If set, connect to gateway using tls encryption
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
:param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent.
Expand Down Expand Up @@ -495,6 +500,7 @@ def __init__(
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param reload: If set, auto-reloading on file changes is enabled: the Flow will restart while blocked if YAML configuration source is changed. This also applies apply to underlying Executors, if their source code or YAML configuration has changed.
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
:param uses: The YAML path represents a flow. It can be either a local file path or a URL.
:param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`.

Expand Down
10 changes: 4 additions & 6 deletions jina/serve/networking/connection_pool_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,8 @@ async def _remove_connection(self, deployment, entity_id, address, type):
self._logger.debug(
f'removing connection for deployment {deployment}/{type}/{entity_id} to {address}'
)
connection = await self._deployments[deployment][type][
entity_id
].remove_connection(address)
await self._deployments[deployment][type][entity_id].remove_connection(
address
)
if not self._deployments[deployment][type][entity_id].has_connections():
del self._deployments[deployment][type][entity_id]
return connection
return None
self._deployments[deployment][type].pop(entity_id)
54 changes: 8 additions & 46 deletions jina/serve/networking/replica_list.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
from typing import TYPE_CHECKING, Optional, Sequence, Union
from typing import TYPE_CHECKING, Optional, Sequence
from urllib.parse import urlparse

import grpc
from grpc.aio import ClientInterceptor

from jina.excepts import EstablishGrpcConnectionError
Expand All @@ -18,8 +16,6 @@
OpenTelemetryClientInterceptor,
)

GRACE_PERIOD_DESTROY_CONNECTION = 0.5


class _ReplicaList:
"""
Expand All @@ -44,25 +40,21 @@ def __init__(
self._metrics = metrics
self._histograms = histograms
self._logger = logger
self._destroyed_event = asyncio.Event()
self.aio_tracing_client_interceptors = aio_tracing_client_interceptors
self.tracing_client_interceptors = tracing_client_interceptor
self._deployment_name = deployment_name
# a set containing all the ConnectionStubs that will be created using add_connection
# this set is not updated in reset_connection and remove_connection
self._warmup_stubs = set()

async def reset_connection(
self, address: str, deployment_name: str
) -> Union[grpc.aio.Channel, None]:
async def reset_connection(self, address: str, deployment_name: str):
"""
Removes and then re-adds a connection.
Result is the same as calling :meth:`remove_connection` and then :meth:`add_connection`, but this allows for
handling of race condition if multiple callers reset a connection at the same time.

:param address: Target address of this connection
:param deployment_name: Target deployment of this connection
:returns: The reset connection or None if there was no connection for the given address
"""
self._logger.debug(f'resetting connection for {deployment_name} to {address}')

Expand All @@ -71,26 +63,16 @@ async def reset_connection(
and self._address_to_connection_idx[address] is not None
):
# remove connection:
# in contrast to remove_connection(), we don't 'shorten' the data structures below, instead just set to None
# so if someone else accesses them in the meantime, they know that they can just wait
# in contrast to remove_connection(), we don't 'shorten' the data structures below, instead
# update the data structure with the new connection and let the old connection be colleced by
# the GC
id_to_reset = self._address_to_connection_idx[address]
self._address_to_connection_idx[address] = None
connection_to_reset = self._connections[id_to_reset]
self._connections[id_to_reset] = None
channel_to_reset = self._address_to_channel[address]
self._address_to_channel[address] = None
self._destroyed_event.clear()
await self._destroy_connection(channel_to_reset)
self._destroyed_event.set()
# re-add connection:
self._address_to_connection_idx[address] = id_to_reset
stubs, channel = self._create_connection(address, deployment_name)
self._address_to_channel[address] = channel
self._connections[id_to_reset] = stubs

return connection_to_reset
return None

def add_connection(self, address: str, deployment_name: str):
"""
Add connection with address to the connection list
Expand All @@ -107,7 +89,7 @@ def add_connection(self, address: str, deployment_name: str):
stubs, _ = self._create_connection(address, deployment_name)
self._warmup_stubs.add(stubs)

async def remove_connection(self, address: str) -> Union[grpc.aio.Channel, None]:
async def remove_connection(self, address: str):
"""
Remove connection with address from the connection list

Expand All @@ -118,7 +100,6 @@ async def remove_connection(self, address: str) -> Union[grpc.aio.Channel, None]
which is safe to use in this scenario.

:param address: Remove connection for this address
:returns: The removed connection or None if there was not any for the given address
"""
if address in self._address_to_connection_idx:
self._rr_counter = (
Expand All @@ -127,21 +108,12 @@ async def remove_connection(self, address: str) -> Union[grpc.aio.Channel, None]
else 0
)
idx_to_delete = self._address_to_connection_idx.pop(address)
popped_connection = self._connections.pop(idx_to_delete)
closing_channel = self._address_to_channel[address]
del self._address_to_channel[address]
await self._destroy_connection(
closing_channel, grace=GRACE_PERIOD_DESTROY_CONNECTION
)
self._connections.pop(idx_to_delete)
# update the address/idx mapping
for address in self._address_to_connection_idx:
if self._address_to_connection_idx[address] > idx_to_delete:
self._address_to_connection_idx[address] -= 1

return popped_connection

return None

def _create_connection(self, address, deployment_name: str):
parsed_address = urlparse(address)
address = parsed_address.netloc if parsed_address.netloc else address
Expand All @@ -157,10 +129,6 @@ def _create_connection(self, address, deployment_name: str):
)
return stubs, channel

async def _destroy_connection(self, connection, grace=0.5):
# we should handle graceful termination better, 0.5 is a rather random number here
await connection.close(grace)

async def get_next_connection(self, num_retries=3):
"""
Returns a connection from the list. Strategy is round robin
Expand Down Expand Up @@ -193,13 +161,7 @@ async def _get_next_connection(self, num_retries=3):
self._logger.debug(
f' No valid connection found for {self._deployment_name}, give chance for potential resetting of connection'
)
try:
await asyncio.wait_for(
self._destroyed_event.wait(),
timeout=GRACE_PERIOD_DESTROY_CONNECTION,
)
finally:
return await self._get_next_connection(num_retries=num_retries - 1)
return await self._get_next_connection(num_retries=num_retries - 1)
except IndexError:
# This can happen as a race condition while _removing_ connections
self._rr_counter = 0
Expand Down
2 changes: 2 additions & 0 deletions jina_cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
'--quiet',
'--quiet-error',
'--workspace-id',
'--suppress-root-logging',
'--uses',
'--reload',
'--env',
Expand Down Expand Up @@ -401,6 +402,7 @@
'--log-config',
'--protocol',
'--prefetch',
'--suppress-root-logging',
],
},
}
14 changes: 6 additions & 8 deletions tests/unit/serve/networking/test_connection_pool_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ async def test_head_addition_removal(logger, metrics, port_generator):
assert len(replica_list.get_all_connections()) == 1
assert not connection_pool.get_replicas(deployment=head_deployment, head=False)

removed_connection = await connection_pool.remove_head(
deployment=head_deployment, address=head_address
)
assert removed_connection
await connection_pool.remove_head(deployment=head_deployment, address=head_address)
assert not connection_pool.get_replicas(deployment=head_deployment, head=True)


@pytest.mark.asyncio
Expand Down Expand Up @@ -57,14 +55,14 @@ async def test_replica_addition_removal(logger, metrics, port_generator):
deployment=replica_1_deployment, head=False
).has_connections()

removed_connection_0 = await connection_pool.remove_replica(
await connection_pool.remove_replica(
deployment=replica_0_deployment, address=replica_0_address
)
assert removed_connection_0
removed_connection_1 = await connection_pool.remove_replica(
assert not connection_pool.get_replicas(deployment=replica_0_deployment, head=False)
await connection_pool.remove_replica(
deployment=replica_1_deployment, address=replica_1_address
)
assert removed_connection_1
assert not connection_pool.get_replicas(deployment=replica_1_deployment, head=False)


def test_independent_shards_and_replicas(logger, metrics, port_generator):
Expand Down
17 changes: 5 additions & 12 deletions tests/unit/serve/networking/test_replica_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,8 @@ async def test_remove_connection(replica_list):
async def test_reset_connection(replica_list):
replica_list.add_connection('executor0', 'executor-0')
connection_stub = await replica_list.get_next_connection('executor0')
reset_connection_stub = await replica_list.reset_connection(
'executor0', 'executor-0'
)
# returns stub that was reset
assert connection_stub == reset_connection_stub
await replica_list.reset_connection('executor0', 'executor-0')
new_connection_stub = await replica_list.get_next_connection()
assert new_connection_stub != reset_connection_stub
assert len(replica_list.get_all_connections()) == 1

connection_stub_random_address = await replica_list.reset_connection(
Expand All @@ -74,9 +69,7 @@ async def test_close(replica_list):

async def _print_channel_attributes(connection_stub: _ConnectionStubs):
await asyncio.sleep(0.5)
# currently the channel can get destroyed even if a different co-routine is
# holding a reference
assert not connection_stub.channel.get_state() == ChannelConnectivity.SHUTDOWN
assert connection_stub.channel.get_state() != ChannelConnectivity.SHUTDOWN


@pytest.mark.asyncio
Expand All @@ -92,6 +85,6 @@ async def test_synchornization_when_resetting_connection(replica_list, logger):
),
return_exceptions=True,
)
# TODO invert the assert statement after refactoring because the channel must be
# closed only if there are no references to it.
assert any([issubclass(type(response), BaseException) for response in responses])
assert not any(
[issubclass(type(response), BaseException) for response in responses]
)