1
1
import json
2
+ from collections .abc import Sequence
2
3
from typing import Any , cast
3
4
4
5
from core .variables import SegmentType , Variable
@@ -31,7 +32,7 @@ def _run(self) -> NodeRunResult:
31
32
inputs = self .node_data .model_dump ()
32
33
process_data : dict [str , Any ] = {}
33
34
# NOTE: This node has no outputs
34
- updated_variables : list [Variable ] = []
35
+ updated_variable_selectors : list [Sequence [ str ] ] = []
35
36
36
37
try :
37
38
for item in self .node_data .items :
@@ -98,7 +99,8 @@ def _run(self) -> NodeRunResult:
98
99
value = item .value ,
99
100
)
100
101
variable = variable .model_copy (update = {"value" : updated_value })
101
- updated_variables .append (variable )
102
+ self .graph_runtime_state .variable_pool .add (variable .selector , variable )
103
+ updated_variable_selectors .append (variable .selector )
102
104
except VariableOperatorNodeError as e :
103
105
return NodeRunResult (
104
106
status = WorkflowNodeExecutionStatus .FAILED ,
@@ -107,9 +109,15 @@ def _run(self) -> NodeRunResult:
107
109
error = str (e ),
108
110
)
109
111
112
+ # The `updated_variable_selectors` is a list contains list[str] which not hashable,
113
+ # remove the duplicated items first.
114
+ updated_variable_selectors = list (set (map (tuple , updated_variable_selectors )))
115
+
110
116
# Update variables
111
- for variable in updated_variables :
112
- self .graph_runtime_state .variable_pool .add (variable .selector , variable )
117
+ for selector in updated_variable_selectors :
118
+ variable = self .graph_runtime_state .variable_pool .get (selector )
119
+ if not isinstance (variable , Variable ):
120
+ raise VariableNotFoundError (variable_selector = selector )
113
121
process_data [variable .name ] = variable .value
114
122
115
123
if variable .selector [0 ] == CONVERSATION_VARIABLE_NODE_ID :
0 commit comments