Skip to content

Commit

Permalink
Merge pull request #150 from hoomano/kroussel/workflow_process_contro…
Browse files Browse the repository at this point in the history
…ller_refact_with_entities

fix issue 149 and update workflow processor with entities
  • Loading branch information
xbasset authored Jun 28, 2024
2 parents 780500a + 730d479 commit 18d268b
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 91 deletions.
110 changes: 26 additions & 84 deletions backend/app/models/workflows/workflow_process_controller.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
from datetime import datetime

from jinja2 import Template

from app import server_socket, socketio_message_sender


from mojodex_core.tag_manager import TagManager
from mojodex_core.produced_text_managers.task_produced_text_manager import TaskProducedTextManager
from mojodex_core.entities.db_base_entities import MdMessage, MdUserTask, MdUserWorkflowStepExecutionResult, \
MdWorkflowStep
from mojodex_core.entities.db_base_entities import MdMessage
from mojodex_core.db import MojodexCoreDB, Session

from mojodex_core.entities.user_workflow_execution import UserWorkflowExecution
from sqlalchemy import case, and_

from mojodex_core.entities.user_workflow_step_execution import UserWorkflowStepExecution
from mojodex_core.mail import send_technical_error_email
from mojodex_core.task_execution_title_summary_generator import TaskExecutionTitleSummaryGenerator

from datetime import datetime

class WorkflowProcessController:
logger_prefix = "WorkflowProcessController :: "
Expand All @@ -28,7 +19,8 @@ def __del__(self):
def __init__(self, workflow_execution_pk):
try:
self.db_session = Session(MojodexCoreDB().engine)
self.workflow_execution: UserWorkflowExecution = self.db_session.query(UserWorkflowExecution).get(workflow_execution_pk)
self.workflow_execution: UserWorkflowExecution = self.db_session.query(UserWorkflowExecution).get(
workflow_execution_pk)
self._current_step = None
except Exception as e:
raise Exception(f"{self.logger_prefix} :: __init__ :: {e}")
Expand All @@ -52,50 +44,22 @@ def _get_next_step_execution_to_run(self):
self._current_step = self._generate_new_step_execution(self.workflow_execution.task.steps[0],
self.get_formatted_initial_parameters()) # of first step
return self._current_step

last_validated_step_execution = self.workflow_execution.past_valid_step_executions[-1]


if len(self.workflow_execution.past_valid_step_executions) > 1: # if it's not the 1st else no dependency as it was the first step
db_dependency_step = self.db_session.query(MdWorkflowStep) \
.join(MdUserTask, MdUserTask.task_fk == MdWorkflowStep.task_fk) \
.filter(MdUserTask.user_task_pk == self.workflow_execution.user_task_fk) \
.filter(MdWorkflowStep.rank == last_validated_step_execution.workflow_step.rank - 1) \
.first()

# find dependency step
db_dependency_step = last_validated_step_execution.workflow_step.dependency_step

# find last execution of dependency step
db_dependency_step_execution = self.db_session.query(UserWorkflowStepExecution) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \
.filter(UserWorkflowStepExecution.workflow_step_fk == db_dependency_step.workflow_step_pk) \
.order_by(UserWorkflowStepExecution.creation_date.desc()) \
.first()
db_dependency_step_execution_result = self.db_session.query(MdUserWorkflowStepExecutionResult) \
.filter(
MdUserWorkflowStepExecutionResult.user_workflow_step_execution_fk == db_dependency_step_execution.user_workflow_step_execution_pk) \
.order_by(MdUserWorkflowStepExecutionResult.creation_date.desc()) \
.first()

# load all validated step executions of current step:
current_step_executions_count = self.db_session.query(UserWorkflowStepExecution) \
.join(MdWorkflowStep,
UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == last_validated_step_execution.workflow_step.workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`

(MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),

# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.count()
db_dependency_step_execution = self.workflow_execution.get_last_execution_of_a_step(
db_dependency_step.workflow_step_pk)
db_dependency_step_execution_result = db_dependency_step_execution.user_workflow_step_execution_result

# count all validated step executions of current step:
current_step_executions_count = self.workflow_execution.count_valid_step_executions_of_a_step(
last_validated_step_execution.workflow_step.workflow_step_pk)

# have all parameters been executed and validated?
if current_step_executions_count < len(db_dependency_step_execution_result.result):
Expand All @@ -104,12 +68,9 @@ def _get_next_step_execution_to_run(self):
current_parameter)
return self._current_step


# else, generate new step execution of next step
next_step = self.db_session.query(MdWorkflowStep) \
.join(MdUserTask, MdUserTask.task_fk == MdWorkflowStep.task_fk) \
.filter(MdUserTask.user_task_pk == self.workflow_execution.user_task_fk) \
.filter(MdWorkflowStep.rank == last_validated_step_execution.workflow_step.rank + 1) \
.first()
next_step = last_validated_step_execution.workflow_step.next_step
# Reached last rank order => there is no next step
if next_step is None:
return None # end of workflow
Expand Down Expand Up @@ -154,7 +115,7 @@ def end_workflow_execution(self):

self.add_state_message()
produced_text_pk, produced_text_version_pk, title, production = self._generate_produced_text()

title_tag_manager = TagManager("title")
draft_tag_manager = TagManager("draft")
# add it as a mojo_message
Expand Down Expand Up @@ -190,37 +151,18 @@ def end_workflow_execution(self):

def _generate_produced_text(self):
try:
# concatenation of results of last step's validated executions
# For now, production of a workflow is the concatenation of results of all last step's validated executions' results
last_step = self.workflow_execution.task.steps[-1]
validated_last_step_executions = self.db_session.query(UserWorkflowStepExecution) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == last_step.workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`

(
MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),

# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)
) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.order_by(UserWorkflowStepExecution.creation_date.desc()) \
.all()
validated_last_step_executions = self.workflow_execution.get_valid_executions_of_a_step(last_step.workflow_step_pk)

production = "\n\n".join(
[list(step.result[0].values())[0] for step in validated_last_step_executions[::-1]])
production = "\n\n".join([list(step.result[0].values())[0] for step in validated_last_step_executions])

produced_text_manager = TaskProducedTextManager(self.workflow_execution.session_id,
self.workflow_execution.user.user_id,
self.workflow_execution.user_task_execution_pk,
self.workflow_execution.task.name_for_system)
produced_text_pk, produced_text_version_pk, title, production, text_type = produced_text_manager.save_produced_text(production, title="", text_type_pk=self.workflow_execution.task.output_text_type_fk)
self.workflow_execution.user.user_id,
self.workflow_execution.user_task_execution_pk,
self.workflow_execution.task.name_for_system)
produced_text_pk, produced_text_version_pk, title, production, text_type = produced_text_manager.save_produced_text(
production, title="", text_type_pk=self.workflow_execution.task.output_text_type_fk)

return produced_text_pk, produced_text_version_pk, title, production

Expand Down
83 changes: 79 additions & 4 deletions mojodex_core/entities/user_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sqlalchemy import case
from sqlalchemy.sql import and_


class UserWorkflowExecution(UserTaskExecution):

@property
Expand Down Expand Up @@ -36,9 +37,9 @@ def past_valid_step_executions(self):
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`
(MdWorkflowStep.user_validation_required == True, UserWorkflowStepExecution.validated == True),

(MdWorkflowStep.user_validation_required == True, UserWorkflowStepExecution.validated == True),

# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)
Expand All @@ -50,7 +51,6 @@ def past_valid_step_executions(self):
except Exception as e:
raise Exception(f"{self.__class__.__name__}:: past_valid_step_executions :: {e}")


def get_steps_execution_json(self):
try:
validated_steps_json = [step_execution.to_json() for step_execution in self.past_valid_step_executions]
Expand All @@ -61,3 +61,78 @@ def get_steps_execution_json(self):
return validated_steps_json
except Exception as e:
raise Exception(f"{self.__class__.__name__}:: get_steps_execution_json :: {e}")

def count_valid_step_executions_of_a_step(self, workflow_step_pk):
"""
Count the number of valid step executions of a given step for the current workflow execution.
:param workflow_step_pk:
:return:
"""
try:
session = object_session(self)
return session.query(UserWorkflowStepExecution) \
.join(MdWorkflowStep,
UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`
(MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),
# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.count()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: count_valid_step_executions_of_a_step :: {e}")

def get_last_execution_of_a_step(self, workflow_step_pk):
"""
Get the last execution of a given step for the current workflow execution.
:param workflow_step_pk:
:return:
"""
try:
session = object_session(self)
return session.query(UserWorkflowStepExecution) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \
.filter(UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \
.order_by(UserWorkflowStepExecution.creation_date.desc()) \
.first()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: get_last_execution_of_a_step :: {e}")

def get_valid_executions_of_a_step(self,workflow_step_pk):
"""
Get the valid executions of a given step for the current workflow execution.
Ordered by creation date in ascending order. (The oldest first)
:param workflow_step_pk:
:return:
"""
try:
session = object_session(self)
return session.query(UserWorkflowStepExecution) \
.join(MdWorkflowStep,
UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`
(MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),
# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.order_by(UserWorkflowStepExecution.creation_date.asc()) \
.all()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: get_valid_executions_of_a_step :: {e}")
12 changes: 9 additions & 3 deletions mojodex_core/entities/user_workflow_step_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,22 @@ def learn_instruction(self, instruction):
raise Exception(f"{self.__class__.__name__} :: learn_instruction :: {e}")

@property
def result(self):
def user_workflow_step_execution_result(self):
try:
# find the last associated step result if any
session = object_session(self)
step_result = session.query(MdUserWorkflowStepExecutionResult) \
.filter(
MdUserWorkflowStepExecutionResult.user_workflow_step_execution_fk == self.user_workflow_step_execution_pk) \
.order_by(MdUserWorkflowStepExecutionResult.creation_date.desc()) \
.first()
return step_result.result if step_result else None
return step_result
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: user_workflow_step_execution_result :: {e}")

@property
def result(self):
try:
return self.user_workflow_step_execution_result.result if self.user_workflow_step_execution_result else None
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: parameter :: {e}")

Expand Down
28 changes: 28 additions & 0 deletions mojodex_core/entities/workflow_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ def get_definition_in_language(self, language_code):
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: get_definition_in_language :: {e}")


@property
def dependency_step(self):
"""The dependency step of a workflow step is the step of its workflow which rank is the previous one."""
try:
session = object_session(self)
return session.query(MdWorkflowStep) \
.filter(MdWorkflowStep.task_fk == self.task_fk) \
.filter(MdWorkflowStep.rank == self.rank - 1) \
.first()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: dependency_step :: {e}")

@property
def next_step(self):
"""
The next step of a workflow step is the step of its workflow which rank is the next one.
:return:
"""
try:
session = object_session(self)
return session.query(MdWorkflowStep) \
.filter(MdWorkflowStep.task_fk == self.task_fk) \
.filter(MdWorkflowStep.rank == self.rank + 1) \
.first()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: next_step :: {e}")

def _execute(self, parameter: dict, learned_instructions: dict, initial_parameters: dict,
past_validated_steps_results: List[dict], user_id: str, user_task_execution_pk: int,
task_name_for_system: str, session_id: str):
Expand Down

0 comments on commit 18d268b

Please sign in to comment.