Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ECS Executor - Overriding Additional ECS Task Properties #35490

Closed
1 of 2 tasks
o-nikolas opened this issue Nov 6, 2023 · 18 comments · Fixed by #37137
Closed
1 of 2 tasks

ECS Executor - Overriding Additional ECS Task Properties #35490

o-nikolas opened this issue Nov 6, 2023 · 18 comments · Fixed by #37137
Assignees
Labels
kind:feature Feature Requests provider:amazon-aws AWS/Amazon - related issues

Comments

@o-nikolas
Copy link
Contributor

Description

From feedback here:

The executor config is scoped to overrides.containerOverrides. However there are relevant properties outside of overrides.containerOverrides that users may want to change.

For example, our ECS Cluster is actually composed of 3 capacity providers: A General-Purpose Capacity Provider (which is our cluster's default provider and runs on M7g instances), Memory-Optimized (R7g instances) and Compute-Optimized (C7g instances). My version of the ECS Executor allows users to set the appropriate Capacity Provider via the operator's executor_config param so that we can run our jobs in the most cost-efficient environment.

There are several other properties which airflow uses may want to set on a task-level, such as:

Use case/motivation

No response

Related issues

#34381

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@o-nikolas o-nikolas added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet provider:amazon-aws AWS/Amazon - related issues and removed needs-triage label for new issues that we didn't triage yet labels Nov 6, 2023
@o-nikolas o-nikolas self-assigned this Nov 10, 2023
@o-nikolas
Copy link
Contributor Author

I started working on this one and there is a bit of nuance that I didn't expect.

Many of the config options are lists of dicts. Take tags for example, the type is:

 tags=[
        {
            'key': 'string',
            'value': 'string'
        },
    ],

So if you have a base set of run_task_kwargs configured in your airflow config such as:

AIRFLOW__AWS_ECS_EXECUTOR__RUN_TASK_KWARGS='{
"tags": [{"key": "fish", "value": "banana"}, {"key": "peanut", "value": "sauce"}],
"startedBy": "Niko",
"overrides": {"containerOverrides": [{"memory": 500, "environment":[{"name": "FOO", "value": "BAR"}]}]}}'

And in your executor_config on a particular task you specify:

@task(
    executor_config={
        "tags": [{"key": "FOO", "value": "BAR"}],
        "overrides": {"containerOverrides": [{"memory": 740}]},
    }
)
def hello_world():
    print("hello_world")

Should the final kwargs sent to ECS for tags be the sum of the two?: tags=[{'key': 'fish', 'value': 'banana'}, {'key': 'peanut', 'value': 'sauce'}, {"key": "FOO", "value": "BAR"}] or a zipping of the two where we update each dict in the list by index (copying/leaving the remainder if one list is smaller than the other): tags=[{'key': 'FOO', 'value': 'BAR'}, {'key': 'peanut', 'value': 'sauce'}] or should the executor_config completely override from?: "tags": [{'key': 'FOO', 'value': 'BAR'}],

There are many other configs that are lists of dicts in the ECS run_task API, I'm not sure one answer fits all of them. containerOverrides for example cannot be additive, we must update each dict with what is provided in the executor_config since each index in the list maps to a container, so a zipping update to each corresponding dict by index makes the most sense. However, something like tags above seems like it could be a different approach. We must evaluate many of the other configs as well (the full list is here).

I'd very much appreciate some input from @ferruzzi @shubham22 @syedahsn

As well as from @mshober who has an implementation of this, so I'd be interested to hear what approach they took and how it's working out for them:

My version of the ECS Executor allows users to set the appropriate Capacity Provider via the operator's executor_config param

@mshober
Copy link

mshober commented Nov 10, 2023

Thanks for starting this @o-nikolas!

It is important to note that you can use propagateTags=TASK_DEFINITION as a way of setting default tags to all the tasks that Airflow launches. I haven't implemented tagging for my own environment yet, but that would be my approach to global tagging. That would render settings tags via run_task_kwargs obsolete and then tags set with executor_config could always be the complete set of tags that are passed to the RunTask API.

I'm also personally not a huge fan of the RUN_TASK_KWARGS config option. We already have explicit config options for several attributes (cluster, networkConfiguration, taskDefinition, platformVersion, launchType). I'd rather have more of the same. If we dictate what attributes can be set on a config level then it makes handling the executor_config much simpler. For example, I don't see the need of having overrides supported as a global config option when all of those properties can be set in the task definition.

As well as from @mshober who has an implementation of this, so I'd be interested to hear what approach they took and how it's working out for them:

My implementation is very specific and abstracted to my use case:

def _run_task_kwargs(self, task_id: TaskInstanceKeyType, cmd: CommandType, queue: str, exec_config: ExecutorConfigType) -> dict:
    capacity_provider = exec_config.get("capacity_provider")
    memory_reservation_mib: int | None = exec_config.get("memory_reservation_mib")
    memory_limit_mib: int | None = exec_config.get("memory_limit_mib")
    cpu_reservation: int | None = exec_config.get("cpu_reservation")

    run_task_api = deepcopy(self.run_task_kwargs)
    container_override = self.get_container(run_task_api["overrides"])
    container_override["command"] = cmd
    if cpu_reservation is not None:
        container_override["cpu"] = int(cpu_reservation)
    if memory_reservation_mib is not None:
        container_override["memoryReservation"] = int(memory_reservation_mib)
    if memory_limit_mib is not None:
        container_override["memory"] = int(memory_limit_mib)
    if capacity_provider:
        run_task_api["capacityProviderStrategy"] = [{"capacityProvider": capacity_provider}]
    return run_task_api

so my code is likely not too helpful. I'll spend some time thinking about how it can be improved to fit all use cases.

@o-nikolas
Copy link
Contributor Author

Thanks for starting this @o-nikolas!

It is important to note that you can use propagateTags=TASK_DEFINITION as a way of setting default tags to all the tasks that Airflow launches. I haven't implemented tagging for my own environment yet, but that would be my approach to global tagging. That would render settings tags via run_task_kwargs obsolete and then tags set with executor_config could always be the complete set of tags that are passed to the RunTask API.

It's not really so much that tags are the specific case that needs a work around, just the example I chose, there are many others.

I'm also personally not a huge fan of the RUN_TASK_KWARGS config option. We already have explicit config options for several attributes (cluster, networkConfiguration, taskDefinition, platformVersion, launchType). I'd rather have more of the same. If we dictate what attributes can be set on a config level then it makes handling the executor_config much simpler. For example, I don't see the need of having overrides supported as a global config option when all of those properties can be set in the task definition.

I'm not in favour of this approach. There are many config options for ECS RunTask, creating a first class Airflow config option for each one is untenable. Especially because that couples us very tightly to the ECS API, every change it makes we must reflect in our config options. The few important configs we have now plus the catch-all of the run_task_kwargs gives users full control without overburdening the maintenance and quality of the underlying code.
We have the same situation with Airflow operators (which usually wrap an underlying Boto API or two), those operators cannot possibly take every parameter that the underlying Boto API can, so we pull out some of the mandatory and important ones, and then often have a <boto_api>_kwargs arg that allows the user to specify anything else.

As well as from @mshober who has an implementation of this, so I'd be interested to hear what approach they took and how it's working out for them:

My implementation is very specific and abstracted to my use case:

...

so my code is likely not too helpful. I'll spend some time thinking about how it can be improved to fit all use cases.

Mmm, yeah, that is very specific to your usecase it seems. Thanks for sharing it either way!

@ferruzzi
Copy link
Contributor

ferruzzi commented Dec 1, 2023

I think tags should be additive.... AFAIK it will always be a list of len(1) and we should have an order of precedence; just tags[0]update() it so they get appended or overridden as expected seems reasonable to me. I think that's a pretty straightforward solution and aligns with how we handle tags elsewhere.

I'm pretty sure the container_overrides is the same and there can only be one container (for now?) so it would only ever be a len(1) list? In which case I like your solution. If I'm mistaken, then the issue becomes how can we identify the container? In your example above, if it is possible to have multiple containers, there is no indication about which container is being referenced so I don't think we can make that call. We can't assume that list[0] is always the same container if there is no container id stored to verify it. If there can only be one container then I don't see the issue with how we handle it everywhere else; set an order of precedence and update() up the chain to override or append as necessary.

I definitely agree that I wouldn't want an explicit config option for every possible field. Since we don't have any control over the boto API and don't get any kind of prior warning to changes they might make, we'll be stuck chasing it any time they decide to make a change. I suppose the flip side of that is that we're just passing the buck to the users instead, but as Niko mentioned it's how we handle it elsewhere so there's precedent.

@o-nikolas
Copy link
Contributor Author

Thanks for taking the time to read and consider this issue @ferruzzi, it's a sticky one and I appreciate the brain cycles.

I think tags should be additive.... AFAIK it will always be a list of len(1) and we should have an order of precedence; just tags[0]update() it so they get appended or overridden as expected seems reasonable to me. I think that's a pretty straightforward solution and aligns with how we handle tags elsewhere.

If we go the purely additive approach, it means that there's no way for a task to use executor_config to override the tags provided by the run_task_kwargs config template. Which seems like a sad compromise, but if everyone else agrees then that's fair.

I'm pretty sure the container_overrides is the same and there can only be one container (for now?) so it would only ever be a len(1) list? In which case I like your solution. If I'm mistaken, then the issue becomes how can we identify the container? In your example above, if it is possible to have multiple containers, there is no indication about which container is being referenced so I don't think we can make that call. We can't assume that list[0] is always the same container if there is no container id stored to verify it. If there can only be one container then I don't see the issue with how we handle it everywhere else; set an order of precedence and update() up the chain to override or append as necessary.

Perhaps we can assert that the number of items in the overrides from the executor_config matches that we already have from the airflow config options. Also the override config itself contains sub configs that are themselves lists, so you run into the problem for each of those as well (should we do additive, override, zip, etc) sigh.

Plus there are other config options that are lists of dicts as well:

  • capacityProviderStrategy
  • placementConstraints
  • placementStrategy

@ferruzzi
Copy link
Contributor

ferruzzi commented Dec 4, 2023

If we go the purely additive approach, it means that there's no way for a task to use executor_config to override the tags provided by the run_task_kwargs config template.

Overwrite, yes. Remove, no. So maybe the solution is to make tags additive with values getting overridden if the name already exists, and some way to explicitly remove a tag? "If tag name starts with ~ then pop it; else add/update it"? I'd have to double check, but I think a valid tag name can start with a - so we can't use that, but I don't think tag names can include ~.

Plus there are other config options that are lists of dicts as well:

It's entirely possible I haven't thought this through down to the bone, but it seems to me that all of them you listed should be override-with-hierarchy. It's possible I'm missing a usecase, of course, but when I don't think it's unreasonable to expect the finer-grained placementStrategy to override the more general one, for example. It seems like the tags are the only one that jumps out at me as really needing add/merge, where you might want to add a tag for which environment launched the task, and another tag at the DAG level stating something, and keep building up that list.

@o-nikolas
Copy link
Contributor Author

If we go the purely additive approach, it means that there's no way for a task to use executor_config to override the tags provided by the run_task_kwargs config template.

Mmm, ya that's an interesting idea. I'm a little concerned with coming up with something too bespoke for each config type. That's a lot of code/complexity to maintain. Interested to hear what others think.

Overwrite, yes. Remove, no. So maybe the solution is to make tags additive with values getting overridden if the name already exists, and some way to explicitly remove a tag? "If tag name starts with ~ then pop it; else add/update it"? I'd have to double check, but I think a valid tag name can start with a - so we can't use that, but I don't think tag names can include ~.

Plus there are other config options that are lists of dicts as well:

It's entirely possible I haven't thought this through down to the bone, but it seems to me that all of them you listed should be override-with-hierarchy. It's possible I'm missing a usecase, of course, but when I don't think it's unreasonable to expect the finer-grained placementStrategy to override the more general one, for example. It seems like the tags are the only one that jumps out at me as really needing add/merge, where you might want to add a tag for which environment launched the task, and another tag at the DAG level stating something, and keep building up that list.

Interesting, I'm not sure of all the edge cases there either. I don't use those configs enough to say for sure, maybe I'll do some more testing with those to see what's up.

@o-nikolas o-nikolas changed the title Overriding Additional ECS Task Properties ECS Executor - Overriding Additional ECS Task Properties Dec 12, 2023
@shubham22
Copy link

I'd vote for simplicity and keeping things consistent irrespective of the type of the field, which means executor_config should always override the run_task_kwargs for any field specified in both. If the user really cares about passing the tags that they've already defined in run_task_kwargs, then they can mention them again. If we go with additive approach, it is another documentation that a user must read and they wouldn't have a way to remove certain tags. Yes, you can design the system with ~ or something, but that's unnecessary logic on top of the logic (additive) which is already inconsistent.

@o-nikolas
Copy link
Contributor Author

I'd vote for simplicity and keeping things consistent irrespective of the type of the field, which means executor_config should always override the run_task_kwargs for any field specified in both. If the user really cares about passing the tags that they've already defined in run_task_kwargs, then they can mention them again. If we go with additive approach, it is another documentation that a user must read and they wouldn't have a way to remove certain tags. Yes, you can design the system with ~ or something, but that's unnecessary logic on top of the logic (additive) which is already inconsistent.

This approach is definitely enticing. Thoughts @syedahsn @ferruzzi?

@shubham22, regarding your approach. In my original example above I have overrides specified in the run_task_kwargs template and in the executor_config. Do you think the smaller executor_config should overwrite the run_task kwargs like option 1 or 2?

  1. {"containerOverrides": [{"memory": 740, "environment":[{"name": "FOO", "value": "BAR"}]}]}

  2. {"containerOverrides": [{"memory": 740}]}

@shubham22
Copy link

shubham22 commented Dec 15, 2023

@o-nikolas - it would be 2. the entire containerOverrides will be replaced with what is provided in overrides as part of the executor_config, same with tags.

@o-nikolas
Copy link
Contributor Author

@o-nikolas - it would be 2. the entire containerOverrides will be replaced with what is provided in overrides as part of the executor_config, same with tags.

Interesting, that means if you want to just update one field for a particular task or DAG (like in this example, add some tags or update the memory for a particularly heavy task) you may wipe out 10s or even ~100 lines of config settings, and to avoid that you'd need to copy all those settings into the executor_config of the task or dag to just change one or two of them. That feels like a bit of a crummy user experience?

@shubham22
Copy link

if you want to just update one field for a particular task or DAG (like in this example, add some tags or update the memory for a particularly heavy task) you may wipe out 10s or even ~100 lines of config settings

Yeah, that's fair critique. To be on the same page, I was suggesting replacing at 1st level field. However, guessing by how this might be used, a user may want to change just memory to accommodate a given task, while keeping everything same. So, instead of replacing a field at the 1st level, we can replace a field at lowest level. In that case as well we should follow same behavior for arrays or strings.

RUN_TASK_KWARGS = {
	"tags": [{"key": "fish", "value": "banana"}, {"key": "peanut", "value": "sauce"}],
	"startedBy": "Niko",
	"overrides": {
		"containerOverrides": [{"memory": 500, "environment":[{"name": "FOO", "value": "BAR"}]}],
		"cpu": "5",
	}
}
executor_config={
    "tags": [{"key": "FOO", "value": "BAR"}],
    "overrides": {
    	"containerOverrides": [{"memory": 740}]
    },
}

Final config for task:

Config = {
"tags": [{"key": "FOO", "value": "BAR"}],
"startedBy": "Niko",
"overrides": {
	"containerOverrides":  [{"memory": 740}],
	"cpu": "5",
	}
}

Yes, you'd still need to provide entire containerOverrides set.

@o-nikolas
Copy link
Contributor Author

if you want to just update one field for a particular task or DAG (like in this example, add some tags or update the memory for a particularly heavy task) you may wipe out 10s or even ~100 lines of config settings

Yeah, that's fair critique. To be on the same page, I was suggesting replacing at 1st level field. However, guessing by how this might be used, a user may want to change just memory to accommodate a given task, while keeping everything same. So, instead of replacing a field at the 1st level, we can replace a field at lowest level. In that case as well we should follow same behavior for arrays or strings.

RUN_TASK_KWARGS = {
	"tags": [{"key": "fish", "value": "banana"}, {"key": "peanut", "value": "sauce"}],
	"startedBy": "Niko",
	"overrides": {
		"containerOverrides": [{"memory": 500, "environment":[{"name": "FOO", "value": "BAR"}]}],
		"cpu": "5",
	}
}
executor_config={
    "tags": [{"key": "FOO", "value": "BAR"}],
    "overrides": {
    	"containerOverrides": [{"memory": 740}]
    },
}

Final config for task:

Config = {
"tags": [{"key": "FOO", "value": "BAR"}],
"startedBy": "Niko",
"overrides": {
	"containerOverrides":  [{"memory": 740}],
	"cpu": "5",
	}
}

Yes, you'd still need to provide entire containerOverrides set.

Yeah, this I'm more onboard with, the final sticking point is exactly what you call out though. containerOverrides can be very large, so having to specify that whole thing to change just one piece would be very frustrating.

wild thought to consider: I wonder if we just keep the current behaviour and just let people subclass this executor to change it if they see fit? Any direction we consider here feels very opinionated and has very clear downsides. I'm not convinced this is the right approach either, but I'm curious to hear what others think (CC @shubham22)

@ferruzzi
Copy link
Contributor

Sorry, I've been dealing with laptop issues. I like where this is heading, in general I feel like the discussion you two had is leading the right way.

I guess the next logical step from there would be to do the same within containerOverrides. In the end maybe that might just end up looking like a recursive merge of the two configs, with override precedence going to the "most local" explicit value?

@syedahsn
Copy link
Contributor

syedahsn commented Jan 18, 2024

Coming a bit late to the conversation, but I just want to make sure I'm following correctly. Is it true that from @shubham22 's example, the "best" final config would look something like this?

Config = {
"tags": [{"key": "fish", "value": "banana"}, {"key": "peanut", "value": "sauce"}, {"key": "FOO", "value": "BAR"}],
"startedBy": "Niko",
"overrides": {
	"containerOverrides":  [{"memory": 740, "environment":[{"name": "FOO", "value": "BAR"}]}],
	"cpu": "5",
	}
}

If so, I think this approach gets the best of both worlds in that the user doesn't have to repeat existing config, and can add or overwrite specific keys to customize individual tasks.

@shubham22
Copy link

@syedahsn - confused about which approach you're calling "best" here. The main difference between your config and my suggestion is the merge and replace of list configs that you're doing. My suggestion is that we should only merge configurations when they're not defined in executor_config and we should never merge the values of the configurations (including when they are of type lists. In your suggestion, I can never get rid of RUN_TASK_KWARGS tags from my task, even if I want to.

@syedahsn
Copy link
Contributor

The difference that I was pointing out between our suggestions was that if we don't specify a key in executor_config which exists in RUN_TASK_KWARGS, that key should not be removed from the final config, no matter what level it is at.

So in your example, even though executor_config did not include a startedBy key, it was still included in the final config. However, overrides was included in both RUN_TASK_KWARGS and executor_config, but because it is a dictionary, we look at the individual keys, and only change a value if the key is being specified in the executor_config.

So in the example, there is cpu key in the overrides dictionary in the RUN_TASK_KWARGS config, but cpu is not in the overrides dictionary in executor_config therefore we leave it as is. As a result, the final config has a cpu key in the overrides dictionary.

The difference between our approach is what happens in containerOverrides. You are suggesting that because we are defining that in the executor_config, it should overwrite the existing value in RUN_TASK_KWARGS. My suggestion is to use the same logic that we are using in the overall config, and evaluate key by key. We would check to see if the key exists in the dictionary, and update if it does, and add it if it doesn't.

As for issue of removing a tag (or any key) from a dictionary, maybe we can use an empty value (i.e. "") as a way to remove an existing key from the config.

RUN_TASK_KWARGS = {
	"tags": [{"key": "fish", "value": "banana"}, {"key": "peanut", "value": "sauce"}],
	"startedBy": "Niko",
	"overrides": {
		"containerOverrides": [{"memory": 500, "environment":[{"name": "FOO", "value": "BAR"}]}],
		"cpu": "5",
	}
}

executor_config={
    "tags": [{"key": "FOO", "value": "BAR"},{"key": "fish", "value": ""}],
    "overrides": {
    	"containerOverrides": [{"memory": 740, "environment": [{"name": "FOO", "value": ""}, {"name": "fish", "value": "banana"}]}]
    },
}

The final config will be :

Config = {
    "tags": [{"key": "peanut", "value": "sauce"}, {"key": "FOO", "value": "BAR"}],
    "startedBy": "Niko",
    "overrides": {
		"containerOverrides": [{"memory": 740, "environment":[{"name": "fish", "value": "banana"}]}],
		"cpu": "5",
	}
}

@o-nikolas
Copy link
Contributor Author

I ended up going with a very opinionated approach which very closely approximates standard python update. What @shubham22 proposed here. It indeed has the drawbacks that I replied after that comment, but I think these are the easiest to expect, since it is how a dict update would behave normally in python. So this approach has the least hidden gotchas or bespoke behaviour, making it easily to maintain and transfer to other usecases.

PR is here: #37137

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:feature Feature Requests provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants