Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add workflow failure reason #1198

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Add failure_reason column to workflow_runs

Revision ID: 1909715536dc
Revises: b8f9e09e181d
Create Date: 2024-11-15 02:51:33.553177+00:00

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "1909715536dc"
down_revision: Union[str, None] = "b8f9e09e181d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("workflow_runs", sa.Column("failure_reason", sa.String(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("workflow_runs", "failure_reason")
# ### end Alembic commands ###
5 changes: 4 additions & 1 deletion skyvern/forge/sdk/db/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,13 +1083,16 @@ async def create_workflow_run(
LOG.error("SQLAlchemyError", exc_info=True)
raise

async def update_workflow_run(self, workflow_run_id: str, status: WorkflowRunStatus) -> WorkflowRun | None:
async def update_workflow_run(
self, workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None
) -> WorkflowRun | None:
async with self.Session() as session:
workflow_run = (
await session.scalars(select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id))
).first()
if workflow_run:
workflow_run.status = status
workflow_run.failure_reason = failure_reason
await session.commit()
await session.refresh(workflow_run)
return convert_to_workflow_run(workflow_run)
Expand Down
1 change: 1 addition & 0 deletions skyvern/forge/sdk/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class WorkflowRunModel(Base):
workflow_permanent_id = Column(String, nullable=False, index=True)
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True)
status = Column(String, nullable=False)
failure_reason = Column(String)
proxy_location = Column(Enum(ProxyLocation))
webhook_callback_url = Column(String)
totp_verification_url = Column(String)
Expand Down
1 change: 1 addition & 0 deletions skyvern/forge/sdk/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled:
workflow_id=workflow_run_model.workflow_id,
organization_id=workflow_run_model.organization_id,
status=WorkflowRunStatus[workflow_run_model.status],
failure_reason=workflow_run_model.failure_reason,
proxy_location=(
ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None
),
Expand Down
91 changes: 75 additions & 16 deletions skyvern/forge/sdk/workflow/models/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
FailedToNavigateToUrl,
MissingBrowserState,
MissingBrowserStatePage,
SkyvernException,
TaskNotFound,
UnexpectedTaskStatus,
)
Expand All @@ -39,7 +40,7 @@
get_path_for_workflow_download_directory,
)
from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory
from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus
from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.exceptions import (
Expand Down Expand Up @@ -83,6 +84,7 @@ class BlockResult:
output_parameter: OutputParameter
output_parameter_value: dict[str, Any] | list | str | None = None
status: BlockStatus | None = None
failure_reason: str | None = None


class Block(BaseModel, abc.ABC):
Expand Down Expand Up @@ -116,11 +118,13 @@ async def record_output_parameter_value(
def build_block_result(
self,
success: bool,
failure_reason: str | None,
output_parameter_value: dict[str, Any] | list | str | None = None,
status: BlockStatus | None = None,
) -> BlockResult:
return BlockResult(
success=success,
failure_reason=failure_reason,
output_parameter=self.output_parameter,
output_parameter_value=output_parameter_value,
status=status,
Expand All @@ -145,7 +149,7 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
async def execute_safe(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
try:
return await self.execute(workflow_run_id, **kwargs)
except Exception:
except Exception as e:
LOG.exception(
"Block execution failed",
workflow_run_id=workflow_run_id,
Expand All @@ -156,7 +160,11 @@ async def execute_safe(self, workflow_run_id: str, **kwargs: dict) -> BlockResul
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
if not workflow_run_context.has_value(self.output_parameter.key):
await self.record_output_parameter_value(workflow_run_context, workflow_run_id)
return self.build_block_result(success=False, status=BlockStatus.failed)

failure_reason = "unexpected exception"
if isinstance(e, SkyvernException):
failure_reason = f"unexpected SkyvernException({e.__class__.__name__})"
return self.build_block_result(success=False, failure_reason=failure_reason, status=BlockStatus.failed)

@abc.abstractmethod
def get_all_parameters(
Expand Down Expand Up @@ -233,6 +241,7 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
current_retry = 0
# initial value for will_retry is True, so that the loop runs at least once
will_retry = True
current_running_task: Task | None = None
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id)
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id)
# if the task url is parameterized, we need to get the value from the workflow run context
Expand Down Expand Up @@ -283,6 +292,7 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
task_order=task_order,
task_retry=task_retry,
)
current_running_task = task
organization = await app.DATABASE.get_organization(organization_id=workflow.organization_id)
if not organization:
raise Exception(f"Organization is missing organization_id={workflow.organization_id}")
Expand Down Expand Up @@ -353,6 +363,7 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
raise TaskNotFound(task.task_id)
if not updated_task.status.is_final():
raise UnexpectedTaskStatus(task_id=updated_task.task_id, status=updated_task.status)
current_running_task = updated_task

block_status_mapping = {
TaskStatus.completed: BlockStatus.completed,
Expand All @@ -375,6 +386,7 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value)
return self.build_block_result(
success=success,
failure_reason=updated_task.failure_reason,
output_parameter_value=output_parameter_value,
status=block_status_mapping[updated_task.status],
)
Expand All @@ -388,7 +400,10 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
organization_id=workflow.organization_id,
)
return self.build_block_result(
success=False, output_parameter_value=None, status=block_status_mapping[updated_task.status]
success=False,
failure_reason=updated_task.failure_reason,
output_parameter_value=None,
status=block_status_mapping[updated_task.status],
)
else:
current_retry += 1
Expand All @@ -413,12 +428,17 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
)
return self.build_block_result(
success=False,
failure_reason=updated_task.failure_reason,
output_parameter_value=output_parameter_value,
status=block_status_mapping[updated_task.status],
)

await self.record_output_parameter_value(workflow_run_context, workflow_run_id)
return self.build_block_result(success=False, status=BlockStatus.failed)
return self.build_block_result(
success=False,
status=BlockStatus.failed,
failure_reason=current_running_task.failure_reason if current_running_task else None,
)


class ForLoopBlock(Block):
Expand Down Expand Up @@ -520,19 +540,37 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
num_loop_over_values=len(loop_over_values),
)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, [])
return self.build_block_result(success=False, status=BlockStatus.terminated)
return self.build_block_result(
success=False,
failure_reason="No iterable value found for the loop block",
status=BlockStatus.terminated,
)

if not self.loop_blocks or len(self.loop_blocks) == 0:
LOG.info(
"No defined blocks to loop, terminating block",
block_type=self.block_type,
workflow_run_id=workflow_run_id,
num_loop_blocks=len(self.loop_blocks),
)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, [])
return self.build_block_result(
success=False, failure_reason="No defined blocks to loop", status=BlockStatus.terminated
)

block_outputs: list[BlockResult] = []
for loop_idx, loop_over_value in enumerate(loop_over_values):
context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value)
for context_parameter in context_parameters_with_value:
workflow_run_context.set_value(context_parameter.key, context_parameter.value)
block_outputs = []
for block_idx, loop_block in enumerate(self.loop_blocks):
original_loop_block = loop_block
loop_block = loop_block.copy()
block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id)
if block_output.status == BlockStatus.canceled:
failure_message = f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} during loop {loop_idx} was canceled for workflow run {workflow_run_id}, canceling for loop"
LOG.info(
f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} was canceled for workflow run {workflow_run_id}, canceling for loop",
failure_message,
block_type=loop_block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
Expand All @@ -542,7 +580,10 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
workflow_run_context, workflow_run_id, outputs_with_loop_values
)
return self.build_block_result(
success=False, output_parameter_value=outputs_with_loop_values, status=BlockStatus.canceled
success=False,
failure_reason=failure_message,
output_parameter_value=outputs_with_loop_values,
status=BlockStatus.canceled,
)

loop_block = original_loop_block
Expand Down Expand Up @@ -580,6 +621,9 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
)
break

# at least one block must be executed in the loop
assert len(block_outputs) != 0

is_any_block_terminated = any([block_output.status == BlockStatus.terminated for block_output in block_outputs])
for_loop_block_status = BlockStatus.completed
if is_any_block_terminated:
Expand All @@ -588,7 +632,10 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
for_loop_block_status = BlockStatus.failed
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, outputs_with_loop_values)
return self.build_block_result(
success=success, output_parameter_value=outputs_with_loop_values, status=for_loop_block_status
success=success,
failure_reason=block_outputs[-1].failure_reason,
output_parameter_value=outputs_with_loop_values,
status=for_loop_block_status,
)


Expand Down Expand Up @@ -710,7 +757,9 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:

response = await self.send_prompt(self.prompt, parameter_values)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, response)
return self.build_block_result(success=True, output_parameter_value=response, status=BlockStatus.completed)
return self.build_block_result(
success=True, failure_reason=None, output_parameter_value=response, status=BlockStatus.completed
)


class DownloadToS3Block(Block):
Expand Down Expand Up @@ -767,7 +816,9 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:

LOG.info("DownloadToS3Block: File downloaded and uploaded to S3", uri=uri)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, uri)
return self.build_block_result(success=True, output_parameter_value=uri, status=BlockStatus.completed)
return self.build_block_result(
success=True, failure_reason=None, output_parameter_value=uri, status=BlockStatus.completed
)


class UploadToS3Block(Block):
Expand Down Expand Up @@ -841,7 +892,9 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:

LOG.info("UploadToS3Block: File(s) uploaded to S3", file_path=self.path)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, s3_uris)
return self.build_block_result(success=True, output_parameter_value=s3_uris, status=BlockStatus.completed)
return self.build_block_result(
success=True, failure_reason=None, output_parameter_value=s3_uris, status=BlockStatus.completed
)


class SendEmailBlock(Block):
Expand Down Expand Up @@ -1109,14 +1162,18 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
LOG.error("SendEmailBlock: Failed to send email", exc_info=True)
result_dict = {"success": False, "error": str(e)}
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
return self.build_block_result(success=False, output_parameter_value=result_dict, status=BlockStatus.failed)
return self.build_block_result(
success=False, failure_reason=str(e), output_parameter_value=result_dict, status=BlockStatus.failed
)
finally:
if smtp_host:
smtp_host.quit()

result_dict = {"success": True}
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
return self.build_block_result(success=True, output_parameter_value=result_dict, status=BlockStatus.completed)
return self.build_block_result(
success=True, failure_reason=None, output_parameter_value=result_dict, status=BlockStatus.completed
)


class FileType(StrEnum):
Expand Down Expand Up @@ -1179,7 +1236,9 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
parsed_data.append(row)
# Record the parsed data
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, parsed_data)
return self.build_block_result(success=True, output_parameter_value=parsed_data, status=BlockStatus.completed)
return self.build_block_result(
success=True, failure_reason=None, output_parameter_value=parsed_data, status=BlockStatus.completed
)


BlockSubclasses = Union[
Expand Down
1 change: 1 addition & 0 deletions skyvern/forge/sdk/workflow/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class WorkflowRun(BaseModel):
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
totp_identifier: str | None = None
failure_reason: str | None = None

created_at: datetime
modified_at: datetime
Expand Down
Loading
Loading