From 65efd2d7bbd338e751775748fcf9b62440b8a26e Mon Sep 17 00:00:00 2001 From: KellyRoussel Date: Thu, 27 Jun 2024 15:09:09 +0200 Subject: [PATCH 1/2] fix issue 149 - join wrning --- backend/app/models/workflows/workflow_process_controller.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/app/models/workflows/workflow_process_controller.py b/backend/app/models/workflows/workflow_process_controller.py index e0a25a5f..f8d604fe 100644 --- a/backend/app/models/workflows/workflow_process_controller.py +++ b/backend/app/models/workflows/workflow_process_controller.py @@ -193,6 +193,8 @@ def _generate_produced_text(self): # concatenation of results of last step's validated executions last_step = self.workflow_execution.task.steps[-1] validated_last_step_executions = self.db_session.query(UserWorkflowStepExecution) \ + .join(MdWorkflowStep, + UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \ .filter( UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \ .filter( From 730d479e0b3e203d227a8d1a2da13d9b102dc445 Mon Sep 17 00:00:00 2001 From: KellyRoussel Date: Thu, 27 Jun 2024 15:57:11 +0200 Subject: [PATCH 2/2] update workflow --- .../workflows/workflow_process_controller.py | 112 ++++-------------- .../entities/user_workflow_execution.py | 83 ++++++++++++- .../entities/user_workflow_step_execution.py | 12 +- mojodex_core/entities/workflow_step.py | 28 +++++ 4 files changed, 142 insertions(+), 93 deletions(-) diff --git a/backend/app/models/workflows/workflow_process_controller.py b/backend/app/models/workflows/workflow_process_controller.py index f8d604fe..4f634044 100644 --- a/backend/app/models/workflows/workflow_process_controller.py +++ b/backend/app/models/workflows/workflow_process_controller.py @@ -1,23 +1,14 @@ -from datetime import datetime - from jinja2 import Template - from app import server_socket, socketio_message_sender - - from mojodex_core.tag_manager import TagManager from mojodex_core.produced_text_managers.task_produced_text_manager import TaskProducedTextManager -from mojodex_core.entities.db_base_entities import MdMessage, MdUserTask, MdUserWorkflowStepExecutionResult, \ - MdWorkflowStep +from mojodex_core.entities.db_base_entities import MdMessage from mojodex_core.db import MojodexCoreDB, Session - from mojodex_core.entities.user_workflow_execution import UserWorkflowExecution -from sqlalchemy import case, and_ - from mojodex_core.entities.user_workflow_step_execution import UserWorkflowStepExecution from mojodex_core.mail import send_technical_error_email from mojodex_core.task_execution_title_summary_generator import TaskExecutionTitleSummaryGenerator - +from datetime import datetime class WorkflowProcessController: logger_prefix = "WorkflowProcessController :: " @@ -28,7 +19,8 @@ def __del__(self): def __init__(self, workflow_execution_pk): try: self.db_session = Session(MojodexCoreDB().engine) - self.workflow_execution: UserWorkflowExecution = self.db_session.query(UserWorkflowExecution).get(workflow_execution_pk) + self.workflow_execution: UserWorkflowExecution = self.db_session.query(UserWorkflowExecution).get( + workflow_execution_pk) self._current_step = None except Exception as e: raise Exception(f"{self.logger_prefix} :: __init__ :: {e}") @@ -52,50 +44,22 @@ def _get_next_step_execution_to_run(self): self._current_step = self._generate_new_step_execution(self.workflow_execution.task.steps[0], self.get_formatted_initial_parameters()) # of first step return self._current_step - + last_validated_step_execution = self.workflow_execution.past_valid_step_executions[-1] - if len(self.workflow_execution.past_valid_step_executions) > 1: # if it's not the 1st else no dependency as it was the first step - db_dependency_step = self.db_session.query(MdWorkflowStep) \ - .join(MdUserTask, MdUserTask.task_fk == MdWorkflowStep.task_fk) \ - .filter(MdUserTask.user_task_pk == self.workflow_execution.user_task_fk) \ - .filter(MdWorkflowStep.rank == last_validated_step_execution.workflow_step.rank - 1) \ - .first() + + # find dependency step + db_dependency_step = last_validated_step_execution.workflow_step.dependency_step # find last execution of dependency step - db_dependency_step_execution = self.db_session.query(UserWorkflowStepExecution) \ - .filter( - UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \ - .filter(UserWorkflowStepExecution.workflow_step_fk == db_dependency_step.workflow_step_pk) \ - .order_by(UserWorkflowStepExecution.creation_date.desc()) \ - .first() - db_dependency_step_execution_result = self.db_session.query(MdUserWorkflowStepExecutionResult) \ - .filter( - MdUserWorkflowStepExecutionResult.user_workflow_step_execution_fk == db_dependency_step_execution.user_workflow_step_execution_pk) \ - .order_by(MdUserWorkflowStepExecutionResult.creation_date.desc()) \ - .first() - - # load all validated step executions of current step: - current_step_executions_count = self.db_session.query(UserWorkflowStepExecution) \ - .join(MdWorkflowStep, - UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \ - .filter( - UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \ - .filter( - UserWorkflowStepExecution.workflow_step_fk == last_validated_step_execution.workflow_step.workflow_step_pk) \ - .filter( - case( - # When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True` - - (MdWorkflowStep.user_validation_required == True, - UserWorkflowStepExecution.validated == True), - - # if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value. - else_=True - )) \ - .filter(UserWorkflowStepExecution.error_status.is_(None)) \ - .count() + db_dependency_step_execution = self.workflow_execution.get_last_execution_of_a_step( + db_dependency_step.workflow_step_pk) + db_dependency_step_execution_result = db_dependency_step_execution.user_workflow_step_execution_result + + # count all validated step executions of current step: + current_step_executions_count = self.workflow_execution.count_valid_step_executions_of_a_step( + last_validated_step_execution.workflow_step.workflow_step_pk) # have all parameters been executed and validated? if current_step_executions_count < len(db_dependency_step_execution_result.result): @@ -104,12 +68,9 @@ def _get_next_step_execution_to_run(self): current_parameter) return self._current_step + # else, generate new step execution of next step - next_step = self.db_session.query(MdWorkflowStep) \ - .join(MdUserTask, MdUserTask.task_fk == MdWorkflowStep.task_fk) \ - .filter(MdUserTask.user_task_pk == self.workflow_execution.user_task_fk) \ - .filter(MdWorkflowStep.rank == last_validated_step_execution.workflow_step.rank + 1) \ - .first() + next_step = last_validated_step_execution.workflow_step.next_step # Reached last rank order => there is no next step if next_step is None: return None # end of workflow @@ -154,7 +115,7 @@ def end_workflow_execution(self): self.add_state_message() produced_text_pk, produced_text_version_pk, title, production = self._generate_produced_text() - + title_tag_manager = TagManager("title") draft_tag_manager = TagManager("draft") # add it as a mojo_message @@ -190,39 +151,18 @@ def end_workflow_execution(self): def _generate_produced_text(self): try: - # concatenation of results of last step's validated executions + # For now, production of a workflow is the concatenation of results of all last step's validated executions' results last_step = self.workflow_execution.task.steps[-1] - validated_last_step_executions = self.db_session.query(UserWorkflowStepExecution) \ - .join(MdWorkflowStep, - UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \ - .filter( - UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \ - .filter( - UserWorkflowStepExecution.workflow_step_fk == last_step.workflow_step_pk) \ - .filter( - case( - # When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True` - - ( - MdWorkflowStep.user_validation_required == True, - UserWorkflowStepExecution.validated == True), - - # if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value. - else_=True - ) - ) \ - .filter(UserWorkflowStepExecution.error_status.is_(None)) \ - .order_by(UserWorkflowStepExecution.creation_date.desc()) \ - .all() + validated_last_step_executions = self.workflow_execution.get_valid_executions_of_a_step(last_step.workflow_step_pk) - production = "\n\n".join( - [list(step.result[0].values())[0] for step in validated_last_step_executions[::-1]]) + production = "\n\n".join([list(step.result[0].values())[0] for step in validated_last_step_executions]) produced_text_manager = TaskProducedTextManager(self.workflow_execution.session_id, - self.workflow_execution.user.user_id, - self.workflow_execution.user_task_execution_pk, - self.workflow_execution.task.name_for_system) - produced_text_pk, produced_text_version_pk, title, production, text_type = produced_text_manager.save_produced_text(production, title="", text_type_pk=self.workflow_execution.task.output_text_type_fk) + self.workflow_execution.user.user_id, + self.workflow_execution.user_task_execution_pk, + self.workflow_execution.task.name_for_system) + produced_text_pk, produced_text_version_pk, title, production, text_type = produced_text_manager.save_produced_text( + production, title="", text_type_pk=self.workflow_execution.task.output_text_type_fk) return produced_text_pk, produced_text_version_pk, title, production diff --git a/mojodex_core/entities/user_workflow_execution.py b/mojodex_core/entities/user_workflow_execution.py index b3e5d420..dd3904d0 100644 --- a/mojodex_core/entities/user_workflow_execution.py +++ b/mojodex_core/entities/user_workflow_execution.py @@ -6,6 +6,7 @@ from sqlalchemy import case from sqlalchemy.sql import and_ + class UserWorkflowExecution(UserTaskExecution): @property @@ -36,9 +37,9 @@ def past_valid_step_executions(self): .filter( case( # When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True` - - (MdWorkflowStep.user_validation_required == True, UserWorkflowStepExecution.validated == True), - + + (MdWorkflowStep.user_validation_required == True, UserWorkflowStepExecution.validated == True), + # if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value. else_=True ) @@ -50,7 +51,6 @@ def past_valid_step_executions(self): except Exception as e: raise Exception(f"{self.__class__.__name__}:: past_valid_step_executions :: {e}") - def get_steps_execution_json(self): try: validated_steps_json = [step_execution.to_json() for step_execution in self.past_valid_step_executions] @@ -61,3 +61,78 @@ def get_steps_execution_json(self): return validated_steps_json except Exception as e: raise Exception(f"{self.__class__.__name__}:: get_steps_execution_json :: {e}") + + def count_valid_step_executions_of_a_step(self, workflow_step_pk): + """ + Count the number of valid step executions of a given step for the current workflow execution. + :param workflow_step_pk: + :return: + """ + try: + session = object_session(self) + return session.query(UserWorkflowStepExecution) \ + .join(MdWorkflowStep, + UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \ + .filter( + UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \ + .filter( + UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \ + .filter( + case( + # When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True` + (MdWorkflowStep.user_validation_required == True, + UserWorkflowStepExecution.validated == True), + # if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value. + else_=True + )) \ + .filter(UserWorkflowStepExecution.error_status.is_(None)) \ + .count() + except Exception as e: + raise Exception(f"{self.__class__.__name__} :: count_valid_step_executions_of_a_step :: {e}") + + def get_last_execution_of_a_step(self, workflow_step_pk): + """ + Get the last execution of a given step for the current workflow execution. + :param workflow_step_pk: + :return: + """ + try: + session = object_session(self) + return session.query(UserWorkflowStepExecution) \ + .filter( + UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \ + .filter(UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \ + .order_by(UserWorkflowStepExecution.creation_date.desc()) \ + .first() + except Exception as e: + raise Exception(f"{self.__class__.__name__} :: get_last_execution_of_a_step :: {e}") + + def get_valid_executions_of_a_step(self,workflow_step_pk): + """ + Get the valid executions of a given step for the current workflow execution. + Ordered by creation date in ascending order. (The oldest first) + :param workflow_step_pk: + :return: + """ + try: + session = object_session(self) + return session.query(UserWorkflowStepExecution) \ + .join(MdWorkflowStep, + UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \ + .filter( + UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \ + .filter( + UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \ + .filter( + case( + # When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True` + (MdWorkflowStep.user_validation_required == True, + UserWorkflowStepExecution.validated == True), + # if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value. + else_=True + )) \ + .filter(UserWorkflowStepExecution.error_status.is_(None)) \ + .order_by(UserWorkflowStepExecution.creation_date.asc()) \ + .all() + except Exception as e: + raise Exception(f"{self.__class__.__name__} :: get_valid_executions_of_a_step :: {e}") \ No newline at end of file diff --git a/mojodex_core/entities/user_workflow_step_execution.py b/mojodex_core/entities/user_workflow_step_execution.py index 83951f7d..4031d592 100644 --- a/mojodex_core/entities/user_workflow_step_execution.py +++ b/mojodex_core/entities/user_workflow_step_execution.py @@ -86,16 +86,22 @@ def learn_instruction(self, instruction): raise Exception(f"{self.__class__.__name__} :: learn_instruction :: {e}") @property - def result(self): + def user_workflow_step_execution_result(self): try: - # find the last associated step result if any session = object_session(self) step_result = session.query(MdUserWorkflowStepExecutionResult) \ .filter( MdUserWorkflowStepExecutionResult.user_workflow_step_execution_fk == self.user_workflow_step_execution_pk) \ .order_by(MdUserWorkflowStepExecutionResult.creation_date.desc()) \ .first() - return step_result.result if step_result else None + return step_result + except Exception as e: + raise Exception(f"{self.__class__.__name__} :: user_workflow_step_execution_result :: {e}") + + @property + def result(self): + try: + return self.user_workflow_step_execution_result.result if self.user_workflow_step_execution_result else None except Exception as e: raise Exception(f"{self.__class__.__name__} :: parameter :: {e}") diff --git a/mojodex_core/entities/workflow_step.py b/mojodex_core/entities/workflow_step.py index 8d878beb..c31f7066 100644 --- a/mojodex_core/entities/workflow_step.py +++ b/mojodex_core/entities/workflow_step.py @@ -38,6 +38,34 @@ def get_definition_in_language(self, language_code): except Exception as e: raise Exception(f"{self.__class__.__name__} :: get_definition_in_language :: {e}") + + @property + def dependency_step(self): + """The dependency step of a workflow step is the step of its workflow which rank is the previous one.""" + try: + session = object_session(self) + return session.query(MdWorkflowStep) \ + .filter(MdWorkflowStep.task_fk == self.task_fk) \ + .filter(MdWorkflowStep.rank == self.rank - 1) \ + .first() + except Exception as e: + raise Exception(f"{self.__class__.__name__} :: dependency_step :: {e}") + + @property + def next_step(self): + """ + The next step of a workflow step is the step of its workflow which rank is the next one. + :return: + """ + try: + session = object_session(self) + return session.query(MdWorkflowStep) \ + .filter(MdWorkflowStep.task_fk == self.task_fk) \ + .filter(MdWorkflowStep.rank == self.rank + 1) \ + .first() + except Exception as e: + raise Exception(f"{self.__class__.__name__} :: next_step :: {e}") + def _execute(self, parameter: dict, learned_instructions: dict, initial_parameters: dict, past_validated_steps_results: List[dict], user_id: str, user_task_execution_pk: int, task_name_for_system: str, session_id: str):