-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: run warmup on Runtimes and Executor #5579
Changes from 16 commits
5817aac
330dd7a
10947b6
967111a
02faa77
23e1777
aa9189d
6824fbf
b70008c
c5155e8
280d481
2885b4e
0f2bf67
2be40b1
4651f3b
d1c7e21
111db68
8090ea8
fcb122b
367a115
34d7d4b
4c86668
a82c128
87ee55f
017dfea
99e0823
e11a41f
ab6119d
e9b6561
479c810
95a5767
9deda11
dd3516b
ada4e5e
a23caf1
51c7e60
855bd7c
7211337
463a998
6232b42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import asyncio | ||
import ipaddress | ||
import os | ||
import threading | ||
import time | ||
from collections import defaultdict | ||
from dataclasses import dataclass | ||
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union | ||
|
@@ -613,9 +615,6 @@ def _get_connection_list( | |
return self._get_connection_list( | ||
deployment, type_, 0, increase_access_count | ||
) | ||
self._logger.debug( | ||
f'did not find a connection for deployment {deployment}, type {type_} and entity_id {entity_id}. There are {len(self._deployments[deployment][type_]) if deployment in self._deployments else 0} available connections for this deployment and type. ' | ||
) | ||
return None | ||
|
||
def _add_deployment(self, deployment: str): | ||
|
@@ -1114,6 +1113,71 @@ async def task_wrapper(): | |
|
||
return asyncio.create_task(task_wrapper()) | ||
|
||
async def warmup( | ||
self, | ||
deployment: str, | ||
stop_event: threading.Event, | ||
): | ||
'''Executes JinaInfoRPC against the provided deployment. A single task is created for each replica connection. | ||
:param deployment: deployment name and the replicas that needs to be warmed up. | ||
:param stop_event: signal to indicate if an early termination of the task is required for graceful teardown. | ||
''' | ||
|
||
async def task_wrapper(target_warmup_responses, target, channel): | ||
try: | ||
stub = jina_pb2_grpc.JinaInfoRPCStub(channel=channel) | ||
call_result = stub._status( | ||
request=jina_pb2.google_dot_protobuf_dot_empty__pb2.Empty(), | ||
) | ||
await call_result | ||
target_warmup_responses[target] = True | ||
except Exception: | ||
target_warmup_responses[target] = False | ||
|
||
try: | ||
timeout = time.time() + 60 * 5 # 5 minutes from now | ||
warmed_up_targets = set() | ||
|
||
while not stop_event.is_set(): | ||
# refresh channels in case connection has been reset due to InternalNetworkError | ||
target_to_channel = self.__extract_target_to_channel(deployment) | ||
for warmed_target in warmed_up_targets: | ||
target_to_channel.pop(warmed_target) | ||
|
||
replica_warmup_responses = {} | ||
tasks = [] | ||
for target, channel in target_to_channel.items(): | ||
tasks.append( | ||
asyncio.create_task( | ||
task_wrapper(replica_warmup_responses, target, channel) | ||
) | ||
) | ||
await asyncio.gather(*tasks, return_exceptions=True) | ||
|
||
for target, response in replica_warmup_responses.items(): | ||
if response: | ||
warmed_up_targets.add(target) | ||
|
||
if time.time() > timeout or len(target_to_channel) == 0: | ||
return | ||
|
||
await asyncio.sleep(0.2) | ||
except Exception as ex: | ||
self._logger.error(f'error with warmup up task: {ex}') | ||
return | ||
|
||
def __extract_target_to_channel(self, deployment): | ||
replica_set = set() | ||
replica_set.update(self._connections.get_replicas_all_shards(deployment)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can extract out the warmup logic into a |
||
replica_set.add( | ||
self._connections.get_replicas(deployment=deployment, head=True) | ||
) | ||
|
||
target_to_channel = {} | ||
for replica_list in filter(None, replica_set): | ||
target_to_channel.update(replica_list._address_to_channel) | ||
return target_to_channel | ||
|
||
@staticmethod | ||
def __aio_channel_with_tracing_interceptor( | ||
address, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import argparse | ||
import asyncio | ||
import signal | ||
import threading | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove unneeded import |
||
import time | ||
from abc import ABC, abstractmethod | ||
from typing import TYPE_CHECKING, Optional, Union | ||
|
@@ -76,6 +77,8 @@ def _cancel(signum, frame): | |
self._start_time = time.time() | ||
self._loop.run_until_complete(self.async_setup()) | ||
self._send_telemetry_event() | ||
self.warmup_task = None | ||
self.warmup_stop_event = threading.Event() | ||
|
||
def _send_telemetry_event(self): | ||
send_telemetry_event(event='start', obj=self, entity_id=self._entity_id) | ||
|
@@ -161,6 +164,20 @@ async def async_run_forever(self): | |
"""The async method to run until it is stopped.""" | ||
... | ||
|
||
async def cancel_warmup_task(self): | ||
'''Cancel warmup task if exists and is not completed. Cancellation is required if the Flow is being terminated before the | ||
task is successful or hasn't reached the max timeout. | ||
''' | ||
if self.warmup_task: | ||
try: | ||
if not self.warmup_task.done(): | ||
self.logger.debug(f'Cancelling warmup task.') | ||
self.warmup_stop_event.set() | ||
await self.warmup_task | ||
self.warmup_task.exception() | ||
except: | ||
pass | ||
|
||
# Static methods used by the Pod to communicate with the `Runtime` in the separate process | ||
|
||
@staticmethod | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove unneeded imports