Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Stream ECS Worker flow run logs to the API (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
zanieb authored May 25, 2023
1 parent e474a08 commit 48f5ddf
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
23 changes: 12 additions & 11 deletions prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import yaml
from prefect.docker import get_prefect_image_name
from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound
from prefect.logging.loggers import get_logger
from prefect.server.schemas.core import FlowRun
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.workers.base import (
Expand Down Expand Up @@ -318,6 +317,10 @@ def cloudwatch_logs_options_requires_configure_cloudwatch_logs(


class ECSVariables(BaseVariables):
"""
Variables for templating an ECS job.
"""

task_definition_arn: Optional[str] = Field(
default=None,
description=(
Expand Down Expand Up @@ -490,10 +493,16 @@ class ECSVariables(BaseVariables):


class ECSWorkerResult(BaseWorkerResult):
pass
"""
The result of an ECS job.
"""


class ECSWorker(BaseWorker):
"""
A Prefect worker to run flow runs as ECS tasks.
"""

type = "ecs"
job_configuration = ECSJobConfiguration
job_configuration_variables = ECSVariables
Expand All @@ -506,14 +515,6 @@ class ECSWorker(BaseWorker):
_is_beta = True
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1jbV4lceHOjGgunX15lUwT/db88e184d727f721575aeb054a37e277/aws.png?h=250" # noqa

def get_logger(self, flow_run: FlowRun):
"""
Get a logger for the given flow run.
"""
# This could stream to the API in the future; should be implemented on the base
# worker class
return get_logger("prefect.workers.ecs").getChild(slugify(flow_run.name))

async def run(
self,
flow_run: "FlowRun",
Expand All @@ -527,7 +528,7 @@ async def run(
self._get_session_and_client, configuration
)

logger = self.get_logger(flow_run)
logger = self.get_flow_run_logger(flow_run)

(
task_arn,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
boto3>=1.24.53
botocore>=1.27.53
prefect>=2.10.5
prefect>=2.10.9
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
4 changes: 3 additions & 1 deletion tests/workers/test_ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ async def test_task_definition_arn_with_variables_that_are_ignored(
)

async with ECSWorker(work_pool_name="test") as worker:
with caplog.at_level(logging.INFO, logger=worker.get_logger(flow_run).name):
with caplog.at_level(
logging.INFO, logger=worker.get_flow_run_logger(flow_run).name
):
result = await run_then_stop_task(worker, configuration, flow_run)

assert result.status_code == 0
Expand Down

0 comments on commit 48f5ddf

Please sign in to comment.