Skip to content

Commit 14912b2

Browse files
Nov1c444Novice LeeNovice Lee
authored andcommitted
Feat: continue on error (#11458)
Co-authored-by: Novice Lee <novicelee@NovicedeMacBook-Pro.local> Co-authored-by: Novice Lee <novicelee@NoviPro.local>
1 parent d1d7682 commit 14912b2

31 files changed

+1211
-80
lines changed

api/core/app/apps/advanced_chat/generate_task_pipeline.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
QueueIterationNextEvent,
2020
QueueIterationStartEvent,
2121
QueueMessageReplaceEvent,
22+
QueueNodeExceptionEvent,
2223
QueueNodeFailedEvent,
2324
QueueNodeInIterationFailedEvent,
2425
QueueNodeStartedEvent,
@@ -31,6 +32,7 @@
3132
QueueStopEvent,
3233
QueueTextChunkEvent,
3334
QueueWorkflowFailedEvent,
35+
QueueWorkflowPartialSuccessEvent,
3436
QueueWorkflowStartedEvent,
3537
QueueWorkflowSucceededEvent,
3638
)
@@ -317,7 +319,7 @@ def _process_stream_response(
317319

318320
if response:
319321
yield response
320-
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent):
322+
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
321323
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
322324

323325
response = self._workflow_node_finish_to_stream_response(
@@ -384,6 +386,29 @@ def _process_stream_response(
384386
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
385387
)
386388

389+
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
390+
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
391+
if not workflow_run:
392+
raise Exception("Workflow run not initialized.")
393+
394+
if not graph_runtime_state:
395+
raise Exception("Graph runtime state not initialized.")
396+
397+
workflow_run = self._handle_workflow_run_partial_success(
398+
workflow_run=workflow_run,
399+
start_at=graph_runtime_state.start_at,
400+
total_tokens=graph_runtime_state.total_tokens,
401+
total_steps=graph_runtime_state.node_run_steps,
402+
outputs=event.outputs,
403+
exceptions_count=event.exceptions_count,
404+
conversation_id=None,
405+
trace_manager=trace_manager,
406+
)
407+
408+
yield self._workflow_finish_to_stream_response(
409+
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
410+
)
411+
387412
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
388413
elif isinstance(event, QueueWorkflowFailedEvent):
389414
if not workflow_run:
@@ -401,6 +426,7 @@ def _process_stream_response(
401426
error=event.error,
402427
conversation_id=self._conversation.id,
403428
trace_manager=trace_manager,
429+
exceptions_count=event.exceptions_count,
404430
)
405431

406432
yield self._workflow_finish_to_stream_response(

api/core/app/apps/workflow/app_queue_manager.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
QueueMessageEndEvent,
77
QueueStopEvent,
88
QueueWorkflowFailedEvent,
9+
QueueWorkflowPartialSuccessEvent,
910
QueueWorkflowSucceededEvent,
1011
WorkflowQueueMessage,
1112
)
@@ -34,7 +35,8 @@ def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
3435
| QueueErrorEvent
3536
| QueueMessageEndEvent
3637
| QueueWorkflowSucceededEvent
37-
| QueueWorkflowFailedEvent,
38+
| QueueWorkflowFailedEvent
39+
| QueueWorkflowPartialSuccessEvent,
3840
):
3941
self.stop_listen()
4042

api/core/app/apps/workflow/generate_task_pipeline.py

+61-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
QueueIterationCompletedEvent,
1616
QueueIterationNextEvent,
1717
QueueIterationStartEvent,
18+
QueueNodeExceptionEvent,
1819
QueueNodeFailedEvent,
1920
QueueNodeInIterationFailedEvent,
2021
QueueNodeStartedEvent,
@@ -26,6 +27,7 @@
2627
QueueStopEvent,
2728
QueueTextChunkEvent,
2829
QueueWorkflowFailedEvent,
30+
QueueWorkflowPartialSuccessEvent,
2931
QueueWorkflowStartedEvent,
3032
QueueWorkflowSucceededEvent,
3133
)
@@ -276,7 +278,7 @@ def _process_stream_response(
276278

277279
if response:
278280
yield response
279-
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent):
281+
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
280282
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
281283

282284
response = self._workflow_node_finish_to_stream_response(
@@ -345,29 +347,81 @@ def _process_stream_response(
345347
yield self._workflow_finish_to_stream_response(
346348
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
347349
)
348-
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
350+
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
349351
if not workflow_run:
350352
raise Exception("Workflow run not initialized.")
351353

352354
if not graph_runtime_state:
353355
raise Exception("Graph runtime state not initialized.")
354356

355-
workflow_run = self._handle_workflow_run_failed(
357+
workflow_run = self._handle_workflow_run_partial_success(
356358
workflow_run=workflow_run,
357359
start_at=graph_runtime_state.start_at,
358360
total_tokens=graph_runtime_state.total_tokens,
359361
total_steps=graph_runtime_state.node_run_steps,
360-
status=WorkflowRunStatus.FAILED
361-
if isinstance(event, QueueWorkflowFailedEvent)
362-
else WorkflowRunStatus.STOPPED,
363-
error=event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
362+
outputs=event.outputs,
363+
exceptions_count=event.exceptions_count,
364364
conversation_id=None,
365365
trace_manager=trace_manager,
366366
)
367367

368368
# save workflow app log
369369
self._save_workflow_app_log(workflow_run)
370370

371+
yield self._workflow_finish_to_stream_response(
372+
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
373+
)
374+
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
375+
if not workflow_run:
376+
raise Exception("Workflow run not initialized.")
377+
378+
if not graph_runtime_state:
379+
raise Exception("Graph runtime state not initialized.")
380+
handle_args = {
381+
"workflow_run": workflow_run,
382+
"start_at": graph_runtime_state.start_at,
383+
"total_tokens": graph_runtime_state.total_tokens,
384+
"total_steps": graph_runtime_state.node_run_steps,
385+
"status": WorkflowRunStatus.FAILED
386+
if isinstance(event, QueueWorkflowFailedEvent)
387+
else WorkflowRunStatus.STOPPED,
388+
"error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
389+
"conversation_id": None,
390+
"trace_manager": trace_manager,
391+
"exceptions_count": event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0,
392+
}
393+
workflow_run = self._handle_workflow_run_failed(**handle_args)
394+
395+
# save workflow app log
396+
self._save_workflow_app_log(workflow_run)
397+
398+
yield self._workflow_finish_to_stream_response(
399+
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
400+
)
401+
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
402+
if not workflow_run:
403+
raise Exception("Workflow run not initialized.")
404+
405+
if not graph_runtime_state:
406+
raise Exception("Graph runtime state not initialized.")
407+
handle_args = {
408+
"workflow_run": workflow_run,
409+
"start_at": graph_runtime_state.start_at,
410+
"total_tokens": graph_runtime_state.total_tokens,
411+
"total_steps": graph_runtime_state.node_run_steps,
412+
"status": WorkflowRunStatus.FAILED
413+
if isinstance(event, QueueWorkflowFailedEvent)
414+
else WorkflowRunStatus.STOPPED,
415+
"error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
416+
"conversation_id": None,
417+
"trace_manager": trace_manager,
418+
"exceptions_count": event.exceptions_count,
419+
}
420+
workflow_run = self._handle_workflow_run_partial_success(**handle_args)
421+
422+
# save workflow app log
423+
self._save_workflow_app_log(workflow_run)
424+
371425
yield self._workflow_finish_to_stream_response(
372426
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
373427
)

api/core/app/apps/workflow_app_runner.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
QueueIterationCompletedEvent,
99
QueueIterationNextEvent,
1010
QueueIterationStartEvent,
11+
QueueNodeExceptionEvent,
1112
QueueNodeFailedEvent,
1213
QueueNodeInIterationFailedEvent,
1314
QueueNodeStartedEvent,
@@ -18,20 +19,23 @@
1819
QueueRetrieverResourcesEvent,
1920
QueueTextChunkEvent,
2021
QueueWorkflowFailedEvent,
22+
QueueWorkflowPartialSuccessEvent,
2123
QueueWorkflowStartedEvent,
2224
QueueWorkflowSucceededEvent,
2325
)
2426
from core.workflow.entities.variable_pool import VariablePool
2527
from core.workflow.graph_engine.entities.event import (
2628
GraphEngineEvent,
2729
GraphRunFailedEvent,
30+
GraphRunPartialSucceededEvent,
2831
GraphRunStartedEvent,
2932
GraphRunSucceededEvent,
3033
IterationRunFailedEvent,
3134
IterationRunNextEvent,
3235
IterationRunStartedEvent,
3336
IterationRunSucceededEvent,
3437
NodeInIterationFailedEvent,
38+
NodeRunExceptionEvent,
3539
NodeRunFailedEvent,
3640
NodeRunRetrieverResourceEvent,
3741
NodeRunStartedEvent,
@@ -176,8 +180,12 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
176180
)
177181
elif isinstance(event, GraphRunSucceededEvent):
178182
self._publish_event(QueueWorkflowSucceededEvent(outputs=event.outputs))
183+
elif isinstance(event, GraphRunPartialSucceededEvent):
184+
self._publish_event(
185+
QueueWorkflowPartialSuccessEvent(outputs=event.outputs, exceptions_count=event.exceptions_count)
186+
)
179187
elif isinstance(event, GraphRunFailedEvent):
180-
self._publish_event(QueueWorkflowFailedEvent(error=event.error))
188+
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
181189
elif isinstance(event, NodeRunStartedEvent):
182190
self._publish_event(
183191
QueueNodeStartedEvent(
@@ -253,6 +261,36 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
253261
in_iteration_id=event.in_iteration_id,
254262
)
255263
)
264+
elif isinstance(event, NodeRunExceptionEvent):
265+
self._publish_event(
266+
QueueNodeExceptionEvent(
267+
node_execution_id=event.id,
268+
node_id=event.node_id,
269+
node_type=event.node_type,
270+
node_data=event.node_data,
271+
parallel_id=event.parallel_id,
272+
parallel_start_node_id=event.parallel_start_node_id,
273+
parent_parallel_id=event.parent_parallel_id,
274+
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
275+
start_at=event.route_node_state.start_at,
276+
inputs=event.route_node_state.node_run_result.inputs
277+
if event.route_node_state.node_run_result
278+
else {},
279+
process_data=event.route_node_state.node_run_result.process_data
280+
if event.route_node_state.node_run_result
281+
else {},
282+
outputs=event.route_node_state.node_run_result.outputs
283+
if event.route_node_state.node_run_result
284+
else {},
285+
error=event.route_node_state.node_run_result.error
286+
if event.route_node_state.node_run_result and event.route_node_state.node_run_result.error
287+
else "Unknown error",
288+
execution_metadata=event.route_node_state.node_run_result.metadata
289+
if event.route_node_state.node_run_result
290+
else {},
291+
in_iteration_id=event.in_iteration_id,
292+
)
293+
)
256294
elif isinstance(event, NodeInIterationFailedEvent):
257295
self._publish_event(
258296
QueueNodeInIterationFailedEvent(

api/core/app/entities/queue_entities.py

+44
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ class QueueEvent(StrEnum):
2525
WORKFLOW_STARTED = "workflow_started"
2626
WORKFLOW_SUCCEEDED = "workflow_succeeded"
2727
WORKFLOW_FAILED = "workflow_failed"
28+
WORKFLOW_PARTIAL_SUCCEEDED = "workflow_partial_succeeded"
2829
ITERATION_START = "iteration_start"
2930
ITERATION_NEXT = "iteration_next"
3031
ITERATION_COMPLETED = "iteration_completed"
3132
NODE_STARTED = "node_started"
3233
NODE_SUCCEEDED = "node_succeeded"
3334
NODE_FAILED = "node_failed"
35+
NODE_EXCEPTION = "node_exception"
3436
RETRIEVER_RESOURCES = "retriever_resources"
3537
ANNOTATION_REPLY = "annotation_reply"
3638
AGENT_THOUGHT = "agent_thought"
@@ -237,6 +239,17 @@ class QueueWorkflowFailedEvent(AppQueueEvent):
237239

238240
event: QueueEvent = QueueEvent.WORKFLOW_FAILED
239241
error: str
242+
exceptions_count: int
243+
244+
245+
class QueueWorkflowPartialSuccessEvent(AppQueueEvent):
246+
"""
247+
QueueWorkflowFailedEvent entity
248+
"""
249+
250+
event: QueueEvent = QueueEvent.WORKFLOW_PARTIAL_SUCCEEDED
251+
exceptions_count: int
252+
outputs: Optional[dict[str, Any]] = None
240253

241254

242255
class QueueNodeStartedEvent(AppQueueEvent):
@@ -331,6 +344,37 @@ class QueueNodeInIterationFailedEvent(AppQueueEvent):
331344
error: str
332345

333346

347+
class QueueNodeExceptionEvent(AppQueueEvent):
348+
"""
349+
QueueNodeExceptionEvent entity
350+
"""
351+
352+
event: QueueEvent = QueueEvent.NODE_EXCEPTION
353+
354+
node_execution_id: str
355+
node_id: str
356+
node_type: NodeType
357+
node_data: BaseNodeData
358+
parallel_id: Optional[str] = None
359+
"""parallel id if node is in parallel"""
360+
parallel_start_node_id: Optional[str] = None
361+
"""parallel start node id if node is in parallel"""
362+
parent_parallel_id: Optional[str] = None
363+
"""parent parallel id if node is in parallel"""
364+
parent_parallel_start_node_id: Optional[str] = None
365+
"""parent parallel start node id if node is in parallel"""
366+
in_iteration_id: Optional[str] = None
367+
"""iteration id if node is in iteration"""
368+
start_at: datetime
369+
370+
inputs: Optional[dict[str, Any]] = None
371+
process_data: Optional[dict[str, Any]] = None
372+
outputs: Optional[dict[str, Any]] = None
373+
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
374+
375+
error: str
376+
377+
334378
class QueueNodeFailedEvent(AppQueueEvent):
335379
"""
336380
QueueNodeFailedEvent entity

api/core/app/entities/task_entities.py

+1
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ class Data(BaseModel):
213213
created_by: Optional[dict] = None
214214
created_at: int
215215
finished_at: int
216+
exceptions_count: Optional[int] = 0
216217
files: Optional[Sequence[Mapping[str, Any]]] = []
217218

218219
event: StreamEvent = StreamEvent.WORKFLOW_FINISHED

0 commit comments

Comments
 (0)