Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue 149 and update workflow processor with entities #150

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 26 additions & 84 deletions backend/app/models/workflows/workflow_process_controller.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
from datetime import datetime

from jinja2 import Template

from app import server_socket, socketio_message_sender


from mojodex_core.tag_manager import TagManager
from mojodex_core.produced_text_managers.task_produced_text_manager import TaskProducedTextManager
from mojodex_core.entities.db_base_entities import MdMessage, MdUserTask, MdUserWorkflowStepExecutionResult, \
MdWorkflowStep
from mojodex_core.entities.db_base_entities import MdMessage
from mojodex_core.db import MojodexCoreDB, Session

from mojodex_core.entities.user_workflow_execution import UserWorkflowExecution
from sqlalchemy import case, and_

from mojodex_core.entities.user_workflow_step_execution import UserWorkflowStepExecution
from mojodex_core.mail import send_technical_error_email
from mojodex_core.task_execution_title_summary_generator import TaskExecutionTitleSummaryGenerator

from datetime import datetime

class WorkflowProcessController:
logger_prefix = "WorkflowProcessController :: "
Expand All @@ -28,7 +19,8 @@ def __del__(self):
def __init__(self, workflow_execution_pk):
try:
self.db_session = Session(MojodexCoreDB().engine)
self.workflow_execution: UserWorkflowExecution = self.db_session.query(UserWorkflowExecution).get(workflow_execution_pk)
self.workflow_execution: UserWorkflowExecution = self.db_session.query(UserWorkflowExecution).get(
workflow_execution_pk)
self._current_step = None
except Exception as e:
raise Exception(f"{self.logger_prefix} :: __init__ :: {e}")
Expand All @@ -52,50 +44,22 @@ def _get_next_step_execution_to_run(self):
self._current_step = self._generate_new_step_execution(self.workflow_execution.task.steps[0],
self.get_formatted_initial_parameters()) # of first step
return self._current_step

last_validated_step_execution = self.workflow_execution.past_valid_step_executions[-1]


if len(self.workflow_execution.past_valid_step_executions) > 1: # if it's not the 1st else no dependency as it was the first step
db_dependency_step = self.db_session.query(MdWorkflowStep) \
.join(MdUserTask, MdUserTask.task_fk == MdWorkflowStep.task_fk) \
.filter(MdUserTask.user_task_pk == self.workflow_execution.user_task_fk) \
.filter(MdWorkflowStep.rank == last_validated_step_execution.workflow_step.rank - 1) \
.first()

# find dependency step
db_dependency_step = last_validated_step_execution.workflow_step.dependency_step

# find last execution of dependency step
db_dependency_step_execution = self.db_session.query(UserWorkflowStepExecution) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \
.filter(UserWorkflowStepExecution.workflow_step_fk == db_dependency_step.workflow_step_pk) \
.order_by(UserWorkflowStepExecution.creation_date.desc()) \
.first()
db_dependency_step_execution_result = self.db_session.query(MdUserWorkflowStepExecutionResult) \
.filter(
MdUserWorkflowStepExecutionResult.user_workflow_step_execution_fk == db_dependency_step_execution.user_workflow_step_execution_pk) \
.order_by(MdUserWorkflowStepExecutionResult.creation_date.desc()) \
.first()

# load all validated step executions of current step:
current_step_executions_count = self.db_session.query(UserWorkflowStepExecution) \
.join(MdWorkflowStep,
UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == last_validated_step_execution.workflow_step.workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`

(MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),

# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.count()
db_dependency_step_execution = self.workflow_execution.get_last_execution_of_a_step(
db_dependency_step.workflow_step_pk)
db_dependency_step_execution_result = db_dependency_step_execution.user_workflow_step_execution_result

# count all validated step executions of current step:
current_step_executions_count = self.workflow_execution.count_valid_step_executions_of_a_step(
last_validated_step_execution.workflow_step.workflow_step_pk)

# have all parameters been executed and validated?
if current_step_executions_count < len(db_dependency_step_execution_result.result):
Expand All @@ -104,12 +68,9 @@ def _get_next_step_execution_to_run(self):
current_parameter)
return self._current_step


# else, generate new step execution of next step
next_step = self.db_session.query(MdWorkflowStep) \
.join(MdUserTask, MdUserTask.task_fk == MdWorkflowStep.task_fk) \
.filter(MdUserTask.user_task_pk == self.workflow_execution.user_task_fk) \
.filter(MdWorkflowStep.rank == last_validated_step_execution.workflow_step.rank + 1) \
.first()
next_step = last_validated_step_execution.workflow_step.next_step
# Reached last rank order => there is no next step
if next_step is None:
return None # end of workflow
Expand Down Expand Up @@ -154,7 +115,7 @@ def end_workflow_execution(self):

self.add_state_message()
produced_text_pk, produced_text_version_pk, title, production = self._generate_produced_text()

title_tag_manager = TagManager("title")
draft_tag_manager = TagManager("draft")
# add it as a mojo_message
Expand Down Expand Up @@ -190,37 +151,18 @@ def end_workflow_execution(self):

def _generate_produced_text(self):
try:
# concatenation of results of last step's validated executions
# For now, production of a workflow is the concatenation of results of all last step's validated executions' results
last_step = self.workflow_execution.task.steps[-1]
validated_last_step_executions = self.db_session.query(UserWorkflowStepExecution) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.workflow_execution.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == last_step.workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`

(
MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),

# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)
) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.order_by(UserWorkflowStepExecution.creation_date.desc()) \
.all()
validated_last_step_executions = self.workflow_execution.get_valid_executions_of_a_step(last_step.workflow_step_pk)

production = "\n\n".join(
[list(step.result[0].values())[0] for step in validated_last_step_executions[::-1]])
production = "\n\n".join([list(step.result[0].values())[0] for step in validated_last_step_executions])

produced_text_manager = TaskProducedTextManager(self.workflow_execution.session_id,
self.workflow_execution.user.user_id,
self.workflow_execution.user_task_execution_pk,
self.workflow_execution.task.name_for_system)
produced_text_pk, produced_text_version_pk, title, production, text_type = produced_text_manager.save_produced_text(production, title="", text_type_pk=self.workflow_execution.task.output_text_type_fk)
self.workflow_execution.user.user_id,
self.workflow_execution.user_task_execution_pk,
self.workflow_execution.task.name_for_system)
produced_text_pk, produced_text_version_pk, title, production, text_type = produced_text_manager.save_produced_text(
production, title="", text_type_pk=self.workflow_execution.task.output_text_type_fk)

return produced_text_pk, produced_text_version_pk, title, production

Expand Down
83 changes: 79 additions & 4 deletions mojodex_core/entities/user_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sqlalchemy import case
from sqlalchemy.sql import and_


class UserWorkflowExecution(UserTaskExecution):

@property
Expand Down Expand Up @@ -36,9 +37,9 @@ def past_valid_step_executions(self):
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`
(MdWorkflowStep.user_validation_required == True, UserWorkflowStepExecution.validated == True),

(MdWorkflowStep.user_validation_required == True, UserWorkflowStepExecution.validated == True),

# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)
Expand All @@ -50,7 +51,6 @@ def past_valid_step_executions(self):
except Exception as e:
raise Exception(f"{self.__class__.__name__}:: past_valid_step_executions :: {e}")


def get_steps_execution_json(self):
try:
validated_steps_json = [step_execution.to_json() for step_execution in self.past_valid_step_executions]
Expand All @@ -61,3 +61,78 @@ def get_steps_execution_json(self):
return validated_steps_json
except Exception as e:
raise Exception(f"{self.__class__.__name__}:: get_steps_execution_json :: {e}")

def count_valid_step_executions_of_a_step(self, workflow_step_pk):
"""
Count the number of valid step executions of a given step for the current workflow execution.
:param workflow_step_pk:
:return:
"""
try:
session = object_session(self)
return session.query(UserWorkflowStepExecution) \
.join(MdWorkflowStep,
UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`
(MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),
# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.count()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: count_valid_step_executions_of_a_step :: {e}")

def get_last_execution_of_a_step(self, workflow_step_pk):
"""
Get the last execution of a given step for the current workflow execution.
:param workflow_step_pk:
:return:
"""
try:
session = object_session(self)
return session.query(UserWorkflowStepExecution) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \
.filter(UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \
.order_by(UserWorkflowStepExecution.creation_date.desc()) \
.first()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: get_last_execution_of_a_step :: {e}")

def get_valid_executions_of_a_step(self,workflow_step_pk):
"""
Get the valid executions of a given step for the current workflow execution.
Ordered by creation date in ascending order. (The oldest first)
:param workflow_step_pk:
:return:
"""
try:
session = object_session(self)
return session.query(UserWorkflowStepExecution) \
.join(MdWorkflowStep,
UserWorkflowStepExecution.workflow_step_fk == MdWorkflowStep.workflow_step_pk) \
.filter(
UserWorkflowStepExecution.user_task_execution_fk == self.user_task_execution_pk) \
.filter(
UserWorkflowStepExecution.workflow_step_fk == workflow_step_pk) \
.filter(
case(
# When `MdWorkflowStep.user_validation_required` is `True`, we check that `MdUserWorkflowStepExecution.validated` is also `True`
(MdWorkflowStep.user_validation_required == True,
UserWorkflowStepExecution.validated == True),
# if `user_validation_required` is `False`, we don't care about the `validated` status, and the `MdUserWorkflowStepExecution` will be included in the results regardless of its `validated` value.
else_=True
)) \
.filter(UserWorkflowStepExecution.error_status.is_(None)) \
.order_by(UserWorkflowStepExecution.creation_date.asc()) \
.all()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: get_valid_executions_of_a_step :: {e}")
12 changes: 9 additions & 3 deletions mojodex_core/entities/user_workflow_step_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,22 @@ def learn_instruction(self, instruction):
raise Exception(f"{self.__class__.__name__} :: learn_instruction :: {e}")

@property
def result(self):
def user_workflow_step_execution_result(self):
try:
# find the last associated step result if any
session = object_session(self)
step_result = session.query(MdUserWorkflowStepExecutionResult) \
.filter(
MdUserWorkflowStepExecutionResult.user_workflow_step_execution_fk == self.user_workflow_step_execution_pk) \
.order_by(MdUserWorkflowStepExecutionResult.creation_date.desc()) \
.first()
return step_result.result if step_result else None
return step_result
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: user_workflow_step_execution_result :: {e}")

@property
def result(self):
try:
return self.user_workflow_step_execution_result.result if self.user_workflow_step_execution_result else None
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: parameter :: {e}")

Expand Down
28 changes: 28 additions & 0 deletions mojodex_core/entities/workflow_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ def get_definition_in_language(self, language_code):
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: get_definition_in_language :: {e}")


@property
def dependency_step(self):
"""The dependency step of a workflow step is the step of its workflow which rank is the previous one."""
try:
session = object_session(self)
return session.query(MdWorkflowStep) \
.filter(MdWorkflowStep.task_fk == self.task_fk) \
.filter(MdWorkflowStep.rank == self.rank - 1) \
.first()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: dependency_step :: {e}")

@property
def next_step(self):
"""
The next step of a workflow step is the step of its workflow which rank is the next one.
:return:
"""
try:
session = object_session(self)
return session.query(MdWorkflowStep) \
.filter(MdWorkflowStep.task_fk == self.task_fk) \
.filter(MdWorkflowStep.rank == self.rank + 1) \
.first()
except Exception as e:
raise Exception(f"{self.__class__.__name__} :: next_step :: {e}")

def _execute(self, parameter: dict, learned_instructions: dict, initial_parameters: dict,
past_validated_steps_results: List[dict], user_id: str, user_task_execution_pk: int,
task_name_for_system: str, session_id: str):
Expand Down