Skip to content

Commit

Permalink
Merge branch 'feat--slot-controls' into release--0.24.0
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed May 16, 2024
2 parents 6bd344c + 0e533cc commit 941ff69
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 80 deletions.
15 changes: 5 additions & 10 deletions hatchet_sdk/clients/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
GroupKeyActionEvent,
HeartbeatRequest,
OverridesData,
RefreshTimeoutRequest,
RefreshTimeoutResponse,
ReleaseSlotRequest,
StepActionEvent,
WorkerListenRequest,
WorkerRegisterRequest,
Expand Down Expand Up @@ -341,13 +340,9 @@ def put_overrides_data(self, data: OverridesData):

return response

def refresh_timeout(self, increment_by: str, step_run_id: str = None):
response: RefreshTimeoutResponse = self.client.RefreshTimeout(
RefreshTimeoutRequest(
stepRunId=step_run_id,
incrementTimeoutBy=increment_by,
),
def release_slot(self, step_run_id: str):
self.client.ReleaseSlot(
ReleaseSlotRequest(stepRunId=step_run_id),
timeout=DEFAULT_REGISTER_TIMEOUT,
metadata=get_metadata(self.token),
)

return response
3 changes: 3 additions & 0 deletions hatchet_sdk/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def log(self, line: str):

self.logger_thread_pool.submit(self._log, line)

def release_slot(self):
return self.client.dispatcher.release_slot(self.stepRunId)

def _put_stream(self, data: str | bytes):
try:
self.client.event.stream(data=data, step_run_id=self.stepRunId)
Expand Down
42 changes: 19 additions & 23 deletions hatchet_sdk/dispatcher_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions hatchet_sdk/dispatcher_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,6 @@ class HeartbeatResponse(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

class RefreshTimeoutRequest(_message.Message):
__slots__ = ("stepRunId", "incrementTimeoutBy")
STEPRUNID_FIELD_NUMBER: _ClassVar[int]
INCREMENTTIMEOUTBY_FIELD_NUMBER: _ClassVar[int]
stepRunId: str
incrementTimeoutBy: str
def __init__(self, stepRunId: _Optional[str] = ..., incrementTimeoutBy: _Optional[str] = ...) -> None: ...

class RefreshTimeoutResponse(_message.Message):
__slots__ = ("timeoutAt",)
TIMEOUTAT_FIELD_NUMBER: _ClassVar[int]
timeoutAt: _timestamp_pb2.Timestamp
def __init__(self, timeoutAt: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...

class ReleaseSlotRequest(_message.Message):
__slots__ = ("stepRunId",)
STEPRUNID_FIELD_NUMBER: _ClassVar[int]
Expand Down
Loading

0 comments on commit 941ff69

Please sign in to comment.