Skip to content

Commit

Permalink
fix: worker listener retires (#181)
Browse files Browse the repository at this point in the history
* hotfix: add repository for npm publish

* release(py-sdk): bump version

* chore: ignore venv

* fix: add timeout to retry

* chore: rm change

* fix: remove duplicate logic and favor retries in dispatcher

* release: bump version

* fix: retry on general failures

* chore: rm unused code

* fix: retries reset if greater than interval
  • Loading branch information
grutt authored Feb 22, 2024
1 parent e44c3a1 commit a7cff7f
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ tmp
postgres-data
rabbitmq.conf

*encryption-keys
*encryption-keys
53 changes: 34 additions & 19 deletions python-sdk/hatchet_sdk/clients/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import grpc
from typing import Callable, List, Union
from ..metadata import get_metadata
import time


def new_dispatcher(conn, config: ClientConfig):
Expand All @@ -24,11 +25,13 @@ def get_action_listener(self, ctx, req):
def send_step_action_event(self, ctx, in_):
raise NotImplementedError

DEFAULT_ACTION_LISTENER_RETRY_INTERVAL = 1 # seconds
DEFAULT_ACTION_LISTENER_RETRY_COUNT = 5

DEFAULT_ACTION_LISTENER_RETRY_INTERVAL = 5 # seconds
DEFAULT_ACTION_LISTENER_RETRY_COUNT = 15
DEFAULT_ACTION_TIMEOUT = 60 # seconds
DEFAULT_REGISTER_TIMEOUT = 5


class GetActionListenerRequest:
def __init__(self, worker_name: str, services: List[str], actions: List[str]):
self.worker_name = worker_name
Expand Down Expand Up @@ -67,17 +70,19 @@ def __init__(self, client : DispatcherStub, token, worker_id):
self.token = token
self.worker_id = worker_id
self.retries = 0

self.last_connection_attempt = 0
# self.logger = logger
# self.validator = validator

def actions(self):
while True:
logger.info("Listening for actions...")
logger.info(
"Connecting to Hatchet to establish listener for actions...")

try:
for assigned_action in self.get_listen_client():
assigned_action : AssignedAction
self.retries = 0
assigned_action: AssignedAction

# Process the received action
action_type = self.map_action_type(assigned_action.actionType)
Expand Down Expand Up @@ -110,10 +115,6 @@ def actions(self):
# Context cancelled, unsubscribe and close
# self.logger.debug("Context cancelled, closing listener")
break
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Retry logic
logger.info("Could not connect to Hatchet, retrying...")
self.retries = self.retries + 1
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
logger.info("Deadline exceeded, retrying subscription")
continue
Expand All @@ -122,7 +123,8 @@ def actions(self):
# self.logger.error(f"Failed to receive message: {e}")
# err_ch(e)
logger.error(f"Failed to receive message: {e}")
break

self.retries = self.retries + 1

def parse_action_payload(self, payload : str):
try:
Expand All @@ -143,19 +145,32 @@ def map_action_type(self, action_type):
return None

def get_listen_client(self):
current_time = int(time.time())

if current_time-self.last_connection_attempt > DEFAULT_ACTION_LISTENER_RETRY_INTERVAL:
self.retries = 0

if self.retries > DEFAULT_ACTION_LISTENER_RETRY_COUNT:
raise Exception(f"Could not subscribe to the worker after {DEFAULT_ACTION_LISTENER_RETRY_COUNT} retries")
elif self.retries > 1:
raise Exception(
f"Could not subscribe to the worker after {DEFAULT_ACTION_LISTENER_RETRY_COUNT} retries")
elif self.retries >= 1:
# logger.info
# if we are retrying, we wait for a bit. this should eventually be replaced with exp backoff + jitter
time.sleep(DEFAULT_ACTION_LISTENER_RETRY_INTERVAL)

return self.client.Listen(WorkerListenRequest(
workerId=self.worker_id
),
timeout=DEFAULT_ACTION_TIMEOUT,
metadata=get_metadata(self.token),
)
logger.info(
f"Could not connect to Hatchet, retrying... {self.retries}/{DEFAULT_ACTION_LISTENER_RETRY_COUNT}")

listener = self.client.Listen(WorkerListenRequest(
workerId=self.worker_id
),
timeout=DEFAULT_ACTION_TIMEOUT,
metadata=get_metadata(self.token),
)

self.last_connection_attempt = current_time

logger.info('Listener established.')
return listener

def unregister(self):
try:
Expand Down
12 changes: 3 additions & 9 deletions python-sdk/hatchet_sdk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
from .workflow import WorkflowMeta
from .clients.dispatcher import GetActionListenerRequest, ActionListenerImpl, Action
from .dispatcher_pb2 import ActionType, StepActionEvent, StepActionEventType, GroupKeyActionEvent, GroupKeyActionEventType, STEP_EVENT_TYPE_COMPLETED, STEP_EVENT_TYPE_STARTED, STEP_EVENT_TYPE_FAILED, GROUP_KEY_EVENT_TYPE_STARTED, GROUP_KEY_EVENT_TYPE_COMPLETED, GROUP_KEY_EVENT_TYPE_FAILED
from .client import new_client
from .client import new_client
from concurrent.futures import ThreadPoolExecutor, Future
from google.protobuf.timestamp_pb2 import Timestamp
from .context import Context
from .logger import logger

# Worker class

class Worker:
def __init__(self, name: str, max_threads: int = 200, debug=False, handle_kill=True):
self.name = name
Expand Down Expand Up @@ -347,11 +347,5 @@ def start(self, retry_count=1):
except grpc.RpcError as rpc_error:
logger.error(f"Could not start worker: {rpc_error}")

# if we are here, but not killing, then we should retry start
if not self.killing:
if retry_count > 5:
raise Exception("Could not start worker after 5 retries")

logger.info("Could not start worker, retrying...")

self.start(retry_count + 1)
logger.info("Could not start worker")
2 changes: 1 addition & 1 deletion python-sdk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.10.3"
version = "0.10.4"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"
Expand Down

0 comments on commit a7cff7f

Please sign in to comment.