From b982163f092f85ae62afadd8cf25e727980ce1a0 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Mar 2024 11:00:19 -0500 Subject: [PATCH] formats prefix to not be flowrun every time --- prefect_aws/workers/ecs_worker.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index edbe843d..1ee7e274 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -259,6 +259,7 @@ class ECSJobConfiguration(BaseJobConfiguration): ) configure_cloudwatch_logs: Optional[bool] = Field(default=None) cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict) + cloudwatch_logs_prefix: Optional[str] = Field(default=None) network_configuration: Dict[str, Any] = Field(default_factory=dict) stream_output: Optional[bool] = Field(default=None) task_start_timeout_seconds: int = Field(default=300) @@ -507,6 +508,14 @@ class ECSVariables(BaseVariables): " for available options. " ), ) + cloudwatch_logs_prefix: str = Field( + default=None, + description=( + "When `configure_cloudwatch_logs` is enabled, this setting may be used to" + " set a prefix for the log group. If not provided, the default prefix will" + " be `workpool_name_flow-run`." + ), + ) network_configuration: Dict[str, Any] = Field( default_factory=dict, @@ -673,7 +682,7 @@ def _create_task_and_wait_for_start( if not task_definition_arn: task_definition = self._prepare_task_definition( - configuration, region=ecs_client.meta.region_name + configuration, region=ecs_client.meta.region_name, flow_run=flow_run ) ( task_definition_arn, @@ -1209,6 +1218,7 @@ def _prepare_task_definition( self, configuration: ECSJobConfiguration, region: str, + flow_run: FlowRun, ) -> dict: """ Prepare a task definition by inferring any defaults and merging overrides. @@ -1258,13 +1268,16 @@ def _prepare_task_definition( container["environment"].remove(item) if configuration.configure_cloudwatch_logs: + prefix = f"prefect-logs_{self._work_pool_name}_{flow_run.deployment_id}" container["logConfiguration"] = { "logDriver": "awslogs", "options": { "awslogs-create-group": "true", "awslogs-group": "prefect", "awslogs-region": region, - "awslogs-stream-prefix": configuration.name or "prefect", + "awslogs-stream-prefix": ( + configuration.cloudwatch_logs_prefix or prefix + ), **configuration.cloudwatch_logs_options, }, } @@ -1574,6 +1587,11 @@ def _create_task_run(self, ecs_client: _ECSClient, task_run_request: dict) -> st Returns the task run ARN. """ + run = ecs_client.run_task(**task_run_request) + failures = run["failures"] + if failures: + raise RuntimeError(f"Failed to run ECS task: {failures}") + return ecs_client.run_task(**task_run_request)["tasks"][0] def _task_definitions_equal(self, taskdef_1, taskdef_2) -> bool: