From 58f98e31068f1edfafa7e6d4704c2dacb2b333cc Mon Sep 17 00:00:00 2001 From: Zanie Adkins Date: Wed, 24 May 2023 10:28:26 -0500 Subject: [PATCH 1/2] Stream ECS Worker flow run logs to the API --- prefect_aws/workers/ecs_worker.py | 23 ++++++++++++----------- requirements.txt | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index 3067f3eb..06013972 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -60,7 +60,6 @@ import yaml from prefect.docker import get_prefect_image_name from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound -from prefect.logging.loggers import get_logger from prefect.server.schemas.core import FlowRun from prefect.utilities.asyncutils import run_sync_in_worker_thread from prefect.workers.base import ( @@ -318,6 +317,10 @@ def cloudwatch_logs_options_requires_configure_cloudwatch_logs( class ECSVariables(BaseVariables): + """ + Variables for templating an ECS job. + """ + task_definition_arn: Optional[str] = Field( default=None, description=( @@ -490,10 +493,16 @@ class ECSVariables(BaseVariables): class ECSWorkerResult(BaseWorkerResult): - pass + """ + The result of an ECS job. + """ class ECSWorker(BaseWorker): + """ + A Prefect worker to run flow runs as ECS tasks. + """ + type = "ecs" job_configuration = ECSJobConfiguration job_configuration_variables = ECSVariables @@ -506,14 +515,6 @@ class ECSWorker(BaseWorker): _is_beta = True _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1jbV4lceHOjGgunX15lUwT/db88e184d727f721575aeb054a37e277/aws.png?h=250" # noqa - def get_logger(self, flow_run: FlowRun): - """ - Get a logger for the given flow run. - """ - # This could stream to the API in the future; should be implemented on the base - # worker class - return get_logger("prefect.workers.ecs").getChild(slugify(flow_run.name)) - async def run( self, flow_run: "FlowRun", @@ -527,7 +528,7 @@ async def run( self._get_session_and_client, configuration ) - logger = self.get_logger(flow_run) + logger = self.get_flow_run_logger(flow_run) ( task_arn, diff --git a/requirements.txt b/requirements.txt index 2a0900f7..be12fc53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ boto3>=1.24.53 botocore>=1.27.53 -prefect>=2.10.5 +prefect>=2.10.9 mypy_boto3_s3>=1.24.94 mypy_boto3_secretsmanager>=1.26.49 \ No newline at end of file From 56148a6553c04efe081b45782d592021f03205ad Mon Sep 17 00:00:00 2001 From: Zanie Adkins Date: Wed, 24 May 2023 10:55:22 -0500 Subject: [PATCH 2/2] Fix tests --- tests/workers/test_ecs_worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/workers/test_ecs_worker.py b/tests/workers/test_ecs_worker.py index 21d6e085..6603cc07 100644 --- a/tests/workers/test_ecs_worker.py +++ b/tests/workers/test_ecs_worker.py @@ -670,7 +670,9 @@ async def test_task_definition_arn_with_variables_that_are_ignored( ) async with ECSWorker(work_pool_name="test") as worker: - with caplog.at_level(logging.INFO, logger=worker.get_logger(flow_run).name): + with caplog.at_level( + logging.INFO, logger=worker.get_flow_run_logger(flow_run).name + ): result = await run_then_stop_task(worker, configuration, flow_run) assert result.status_code == 0