Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
formats prefix to not be flowrun every time
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano committed Mar 22, 2024
1 parent 5fc8a3a commit b982163
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class ECSJobConfiguration(BaseJobConfiguration):
)
configure_cloudwatch_logs: Optional[bool] = Field(default=None)
cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict)
cloudwatch_logs_prefix: Optional[str] = Field(default=None)
network_configuration: Dict[str, Any] = Field(default_factory=dict)
stream_output: Optional[bool] = Field(default=None)
task_start_timeout_seconds: int = Field(default=300)
Expand Down Expand Up @@ -507,6 +508,14 @@ class ECSVariables(BaseVariables):
" for available options. "
),
)
cloudwatch_logs_prefix: str = Field(
default=None,
description=(
"When `configure_cloudwatch_logs` is enabled, this setting may be used to"
" set a prefix for the log group. If not provided, the default prefix will"
" be `workpool_name_flow-run`."
),
)

network_configuration: Dict[str, Any] = Field(
default_factory=dict,
Expand Down Expand Up @@ -673,7 +682,7 @@ def _create_task_and_wait_for_start(

if not task_definition_arn:
task_definition = self._prepare_task_definition(
configuration, region=ecs_client.meta.region_name
configuration, region=ecs_client.meta.region_name, flow_run=flow_run
)
(
task_definition_arn,
Expand Down Expand Up @@ -1209,6 +1218,7 @@ def _prepare_task_definition(
self,
configuration: ECSJobConfiguration,
region: str,
flow_run: FlowRun,
) -> dict:
"""
Prepare a task definition by inferring any defaults and merging overrides.
Expand Down Expand Up @@ -1258,13 +1268,16 @@ def _prepare_task_definition(
container["environment"].remove(item)

if configuration.configure_cloudwatch_logs:
prefix = f"prefect-logs_{self._work_pool_name}_{flow_run.deployment_id}"
container["logConfiguration"] = {
"logDriver": "awslogs",
"options": {
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": region,
"awslogs-stream-prefix": configuration.name or "prefect",
"awslogs-stream-prefix": (
configuration.cloudwatch_logs_prefix or prefix
),
**configuration.cloudwatch_logs_options,
},
}
Expand Down Expand Up @@ -1574,6 +1587,11 @@ def _create_task_run(self, ecs_client: _ECSClient, task_run_request: dict) -> st
Returns the task run ARN.
"""
run = ecs_client.run_task(**task_run_request)
failures = run["failures"]
if failures:
raise RuntimeError(f"Failed to run ECS task: {failures}")

return ecs_client.run_task(**task_run_request)["tasks"][0]

def _task_definitions_equal(self, taskdef_1, taskdef_2) -> bool:
Expand Down

0 comments on commit b982163

Please sign in to comment.