Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stream ECS Worker flow run logs to the API #267

Merged
merged 3 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import yaml
from prefect.docker import get_prefect_image_name
from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound
from prefect.logging.loggers import get_logger
from prefect.server.schemas.core import FlowRun
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.workers.base import (
Expand Down Expand Up @@ -318,6 +317,10 @@ def cloudwatch_logs_options_requires_configure_cloudwatch_logs(


class ECSVariables(BaseVariables):
"""
Variables for templating an ECS job.
"""

task_definition_arn: Optional[str] = Field(
default=None,
description=(
Expand Down Expand Up @@ -490,10 +493,16 @@ class ECSVariables(BaseVariables):


class ECSWorkerResult(BaseWorkerResult):
pass
"""
The result of an ECS job.
"""


class ECSWorker(BaseWorker):
"""
A Prefect worker to run flow runs as ECS tasks.
"""

type = "ecs"
job_configuration = ECSJobConfiguration
job_configuration_variables = ECSVariables
Expand All @@ -506,14 +515,6 @@ class ECSWorker(BaseWorker):
_is_beta = True
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1jbV4lceHOjGgunX15lUwT/db88e184d727f721575aeb054a37e277/aws.png?h=250" # noqa

def get_logger(self, flow_run: FlowRun):
"""
Get a logger for the given flow run.
"""
# This could stream to the API in the future; should be implemented on the base
# worker class
return get_logger("prefect.workers.ecs").getChild(slugify(flow_run.name))

async def run(
self,
flow_run: "FlowRun",
Expand All @@ -527,7 +528,7 @@ async def run(
self._get_session_and_client, configuration
)

logger = self.get_logger(flow_run)
logger = self.get_flow_run_logger(flow_run)

(
task_arn,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
boto3>=1.24.53
botocore>=1.27.53
prefect>=2.10.5
prefect>=2.10.9
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
4 changes: 3 additions & 1 deletion tests/workers/test_ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ async def test_task_definition_arn_with_variables_that_are_ignored(
)

async with ECSWorker(work_pool_name="test") as worker:
with caplog.at_level(logging.INFO, logger=worker.get_logger(flow_run).name):
with caplog.at_level(
logging.INFO, logger=worker.get_flow_run_logger(flow_run).name
):
result = await run_then_stop_task(worker, configuration, flow_run)

assert result.status_code == 0
Expand Down