Skip to content

Commit

Permalink
Call SubOrchestrator by Function Name (#437)
Browse files Browse the repository at this point in the history
* call suborcheatrstor by name

* remove space

* remove blank line

* Update DurableOrchestrationContext.py
  • Loading branch information
nytian authored May 3, 2023
1 parent 021e9ee commit 8a93453
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
30 changes: 26 additions & 4 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None,
return task

def call_sub_orchestrator(self,
name: str, input_: Optional[Any] = None,
name: Union[str, Callable], input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution.
Parameters
----------
name: str
name: Union[str, Callable]
The name of the orchestrator function to call.
input_: Optional[Any]
The JSON-serializable input to pass to the orchestrator function.
Expand All @@ -265,19 +265,30 @@ def call_sub_orchestrator(self,
Task
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)

if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)

action = CallSubOrchestratorAction(name, input_, instance_id)
task = self._generate_task(action)
return task

def call_sub_orchestrator_with_retry(self,
name: str, retry_options: RetryOptions,
name: Union[str, Callable], retry_options: RetryOptions,
input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution, with retry-options.
Parameters
----------
name: str
name: Union[str, Callable]
The name of the activity function to schedule.
retry_options: RetryOptions
The settings for retrying this sub-orchestrator in case of a failure.
Expand All @@ -291,6 +302,17 @@ def call_sub_orchestrator_with_retry(self,
Task
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)

if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)

action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
task = self._generate_task(action, retry_options)
return task
Expand Down
39 changes: 39 additions & 0 deletions tests/orchestrator/test_sub_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from azure.durable_functions.models.OrchestratorState import OrchestratorState
from azure.durable_functions.models.actions.CallSubOrchestratorAction \
import CallSubOrchestratorAction
import azure.durable_functions as df
import azure.functions as func

app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

def generator_function(context):
outputs = []
Expand All @@ -19,6 +22,22 @@ def generator_function(context):

return outputs

def generator_function_call_by_function_name(context):
outputs = []
task1 = yield context.call_sub_orchestrator(HelloSubOrchestrator, "Tokyo")
task2 = yield context.call_sub_orchestrator(HelloSubOrchestrator, "Seattle")
task3 = yield context.call_sub_orchestrator(HelloSubOrchestrator, "London")

outputs.append(task1)
outputs.append(task2)
outputs.append(task3)

return outputs

@app.orchestration_trigger(context_name="context")
def HelloSubOrchestrator(context):
return "Hello" + context

def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value)

Expand Down Expand Up @@ -54,3 +73,23 @@ def test_tokyo_and_seattle_and_london_state():

#assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_call_suborchestrator_by_name():
context_builder = ContextBuilder('test_call_suborchestrator_by_name')
add_hello_suborch_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_suborch_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_suborch_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_call_by_function_name)

expected_state = base_expected_state(
['Hello Tokyo!', 'Hello Seattle!', 'Hello London!'])
add_hello_suborch_action(expected_state, 'Tokyo')
add_hello_suborch_action(expected_state, 'Seattle')
add_hello_suborch_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

#assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

0 comments on commit 8a93453

Please sign in to comment.