Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Workflow RPC #15

Merged
merged 10 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ See more in https://github.com/indeedeng/iwf#what-is-iwf
- [x] Start workflow API
- [x] Executing `wait_until`/`execute` APIs and completing workflow
- [x] Parallel execution of multiple states
- [x] GetWorkflowResultsWithWait API
- [x] StateOption: WaitUntil(optional)/Execute API timeout and retry policy
- [x] Get workflow with wait API
- [x] Timer command
Expand All @@ -45,7 +46,6 @@ See more in https://github.com/indeedeng/iwf#what-is-iwf
- [x] Support execute API failure policy
- [ ] Support workflow RPC
- [ ] Signal command
- [ ] Signal workflow API

## Future -- the advanced features that already supported in server. Contributions are welcome to implement them in this SDK!
- [ ] Atomic conditional complete workflow by checking signal/internal channel emptiness
Expand All @@ -54,18 +54,18 @@ See more in https://github.com/indeedeng/iwf#what-is-iwf
- [ ] Describe workflow API
- [ ] TryGetWorkflowResults API
- [ ] Consume N messages in a single command
- [ ] SearchAttribute: keyword
- [ ] New search attribute types: Double, Bool, Datetime, Keyword array, Text
- [ ] Workflow start options: initial search attributes
- [ ] Search workflow API
- [ ] Reset workflow API
- [ ] Skip timer API for testing/operation
- [ ] Decider trigger type: any command combination
- [ ] Failing workflow with results
- [ ] Wait_until API failure policy
- [ ] Caching on persistence
- [ ] Get workflow DataAttributes/SearchAttributes API
- [ ] StateExecutionLocal
- [ ] SearchAttribute: keyword
- [ ] New search attribute types: Double, Bool, Datetime, Keyword array, Text
- [ ] Workflow start options: initial search attributes
- [ ] Search workflow API
- [ ] Get workflow DataAttributes/SearchAttributes API

### Running iwf-server locally

Expand Down
34 changes: 33 additions & 1 deletion iwf/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Any, Optional, Type, TypeVar
import inspect
from typing import Any, Callable, Optional, Type, TypeVar

from iwf.client_options import ClientOptions
from iwf.errors import InvalidArgumentError
from iwf.registry import Registry
from iwf.stop_workflow_options import StopWorkflowOptions
from iwf.unregistered_client import UnregisteredClient, UnregisteredWorkflowOptions
Expand All @@ -12,6 +14,14 @@
T = TypeVar("T")


def get_workflow_type_by_rpc_method(meth) -> str:
if inspect.ismethod(meth):
return inspect.getmro(meth.__self__.__class__)[0].__name__
if inspect.isfunction(meth):
return meth.__qualname__.split(".<locals>", 1)[0].rsplit(".", 1)[0]
raise InvalidArgumentError(f"method {meth} is not a RPC method")


class Client:
def __init__(self, registry: Registry, options: Optional[ClientOptions] = None):
self._registry = registry
Expand Down Expand Up @@ -99,3 +109,25 @@ def stop_workflow(
options: Optional[StopWorkflowOptions] = None,
):
return self._unregistered_client.stop_workflow(workflow_id, "", options)

def invoke_rpc(
self,
workflow_id: str,
rpc: Callable, # this can be a function: RPCWorkflow.rpc_method or a method: workflow_instance.rpc_method
input: Any = None,
return_type_hint: Optional[Type[T]] = None,
) -> Optional[T]:
wf_type = get_workflow_type_by_rpc_method(rpc)
rpc_name = rpc.__name__
rpc_info = self._registry.get_rpc_infos(wf_type)[rpc_name]

return self._unregistered_client.invoke_rpc(
input=input,
workflow_id=workflow_id,
workflow_run_id="",
rpc_name=rpc_name,
timeout_seconds=rpc_info.timeout_seconds,
data_attribute_policy=rpc_info.data_attribute_loading_policy,
all_defined_search_attribute_types=[],
return_type_hint=return_type_hint,
)
20 changes: 18 additions & 2 deletions iwf/communication.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
from typing import Any, Optional
from typing import Any, Optional, Union

from iwf_api.models import EncodedObject, InterStateChannelPublishing

from iwf.errors import WorkflowDefinitionError
from iwf.object_encoder import ObjectEncoder
from iwf.state_movement import StateMovement


class Communication:
_type_store: dict[str, Optional[type]]
_object_encoder: ObjectEncoder
_to_publish_internal_channel: dict[str, list[EncodedObject]]
_state_movements: list[StateMovement]

def __init__(
self, type_store: dict[str, Optional[type]], object_encoder: ObjectEncoder
):
self._object_encoder = object_encoder
self._type_store = type_store
self._to_publish_internal_channel = {}
self._state_movements = []

def publish_to_internal_channel(self, channel_name: str, value: Any):
def trigger_state_execution(self, state: Union[str, type], state_input: Any = None):
"""

Args:
state: the workflowState TODO the type hint should be type[WorkflowState]
state_input: the input of the state
"""
movement = StateMovement.create(state, state_input)
self._state_movements.append(movement)

def publish_to_internal_channel(self, channel_name: str, value: Any = None):
if channel_name not in self._type_store:
raise WorkflowDefinitionError(
f"InternalChannel channel_name is not defined {channel_name}"
Expand All @@ -45,3 +58,6 @@ def get_to_publishing_internal_channel(self) -> list[InterStateChannelPublishing
for val in vals:
pubs.append(InterStateChannelPublishing(name, val))
return pubs

def get_to_trigger_state_movements(self) -> list[StateMovement]:
return self._state_movements
8 changes: 8 additions & 0 deletions iwf/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class WorkflowStillRunningError(ClientSideError):
pass


class WorkflowRPCExecutionError(ClientSideError):
pass


class WorkflowRPCAcquiringLockFailure(ClientSideError):
pass


class WorkflowAlreadyStartedError(ClientSideError):
pass

Expand Down
30 changes: 29 additions & 1 deletion iwf/registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Optional
from typing import Callable, Optional

from iwf.communication_schema import CommunicationMethodType
from iwf.errors import InvalidArgumentError, WorkflowDefinitionError
from iwf.persistence_schema import PersistenceFieldType
from iwf.rpc import RPCInfo
from iwf.workflow import ObjectWorkflow, get_workflow_type
from iwf.workflow_state import WorkflowState, get_state_id

Expand All @@ -13,19 +14,22 @@ class Registry:
_state_store: dict[str, dict[str, WorkflowState]]
_internal_channel_type_store: dict[str, dict[str, Optional[type]]]
_data_attribute_types: dict[str, dict[str, Optional[type]]]
_rpc_infos: dict[str, dict[str, RPCInfo]]

def __init__(self):
self._workflow_store = dict()
self._starting_state_store = dict()
self._state_store = dict()
self._internal_channel_type_store = dict()
self._data_attribute_types = dict()
self._rpc_infos = dict()

def add_workflow(self, wf: ObjectWorkflow):
self._register_workflow_type(wf)
self._register_workflow_state(wf)
self._register_internal_channels(wf)
self._register_data_attributes(wf)
self._register_workflow_rpcs(wf)

def add_workflows(self, *wfs: ObjectWorkflow):
for wf in wfs:
Expand Down Expand Up @@ -62,6 +66,9 @@ def get_internal_channel_types(self, wf_type: str) -> dict[str, Optional[type]]:
def get_data_attribute_types(self, wf_type: str) -> dict[str, Optional[type]]:
return self._data_attribute_types[wf_type]

def get_rpc_infos(self, wf_type: str) -> dict[str, RPCInfo]:
return self._rpc_infos[wf_type]

def _register_workflow_type(self, wf: ObjectWorkflow):
wf_type = get_workflow_type(wf)
if wf_type in self._workflow_store:
Expand Down Expand Up @@ -104,3 +111,24 @@ def _register_workflow_state(self, wf):
starting_state = state_def.state
self._state_store[wf_type] = state_map
self._starting_state_store[wf_type] = starting_state

@staticmethod
def _is_decorated_by_rpc(func: Callable):
return getattr(func, "_is_iwf_rpc", False)

@staticmethod
def _get_rpc_info(func: Callable):
info = getattr(func, "_rpc_info")
assert isinstance(info, RPCInfo)
# NOTE: we have to override the method here so that it's associated the object
info.method_func = func
return info

def _register_workflow_rpcs(self, wf):
wf_type = get_workflow_type(wf)
rpc_infos = {}
for method_name in dir(wf):
method = getattr(wf, method_name)
if callable(method) and self._is_decorated_by_rpc(method):
rpc_infos[method_name] = self._get_rpc_info(method)
self._rpc_infos[wf_type] = rpc_infos
82 changes: 82 additions & 0 deletions iwf/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from dataclasses import dataclass
from functools import wraps
from inspect import signature
from typing import Any, Callable, Optional

from iwf_api.models import PersistenceLoadingPolicy, PersistenceLoadingType

from iwf.errors import WorkflowDefinitionError


@dataclass
class RPCInfo:
method_func: Callable
timeout_seconds: int
input_type: Optional[type] = None
data_attribute_loading_policy: Optional[PersistenceLoadingPolicy] = None
params_order: Optional[
list
] = None # store this so that the rpc can be invoked with correct parameters


rpc_definition_err = WorkflowDefinitionError(
"an RPC must have at most 5 params: self, context:WorkflowContext, input:Any, persistence:Persistence, "
'communication:Communication, where input can be any type as long as the param name is "input" '
)


def rpc(
timeout_seconds: int = 10,
data_attribute_loading_policy: Optional[PersistenceLoadingPolicy] = None,
):
def decorator(func):
# preserve the properties of the original function.
@wraps(func)
def wrapper(*args, **kwargs):
# TODO need to add type hint for decorated method
return func(*args, **kwargs)

wrapper._is_iwf_rpc = True
rpc_info = RPCInfo(
method_func=func,
timeout_seconds=timeout_seconds,
data_attribute_loading_policy=data_attribute_loading_policy,
)
params = signature(func).parameters

from inspect import _empty # ignored.
from iwf.persistence import Persistence
from iwf.workflow_context import WorkflowContext
from iwf.communication import Communication

valid_param_types = {
_empty: True,
Any: True,
Persistence: True,
WorkflowContext: True,
Communication: True,
}
need_persistence = False
params_order = []
if len(params) > 5:
raise rpc_definition_err

for k, v in params.items():
if k != "self":
params_order.append(v.annotation)
if k == "input":
rpc_info.input_type = v.annotation
continue
if v.annotation == Persistence:
need_persistence = True
if v.annotation not in valid_param_types:
raise rpc_definition_err
if not need_persistence:
rpc_info.data_attribute_loading_policy = PersistenceLoadingPolicy(
persistence_loading_type=PersistenceLoadingType.LOAD_NONE
)
rpc_info.params_order = params_order
wrapper._rpc_info = rpc_info
return wrapper

return decorator
Loading