Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ResponseOps][TaskManager] fix limited concurrency starvation in mget task claimer #187809

Merged
merged 16 commits into from
Aug 26, 2024

Conversation

pmuellr
Copy link
Member

@pmuellr pmuellr commented Jul 8, 2024

resolves #184937

Summary

Fixes problem with limited concurrency tasks potentially starving unlimited concurrency tasks, by using _msearch to search limited concurrency tasks separately from unlimited concurrency tasks.

Checklist

@pmuellr pmuellr added Feature:Task Manager Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams) v8.16.0 labels Jul 8, 2024
@pmuellr
Copy link
Member Author

pmuellr commented Jul 8, 2024

/ci

@pmuellr
Copy link
Member Author

pmuellr commented Jul 9, 2024

/ci

@elasticmachine
Copy link
Contributor

💛 Build succeeded, but was flaky

Failed CI Steps

Test Failures

  • [job] [logs] FTR Configs #30 / Rule execution logic API Detection Engine - Execution logic @ess @serverless @skipInServerlessMKI Machine Learning Detection Rule - Alert Suppression with an active ML Job with interval suppression duration performs no suppression if a single alert is generated

Metrics [docs]

✅ unchanged

History

@pmuellr
Copy link
Member Author

pmuellr commented Jul 24, 2024

/ci

@pmuellr
Copy link
Member Author

pmuellr commented Jul 26, 2024

/ci

@pmuellr pmuellr marked this pull request as ready for review July 26, 2024 15:46
@pmuellr pmuellr requested a review from a team as a code owner July 26, 2024 15:46
@elasticmachine
Copy link
Contributor

Pinging @elastic/response-ops (Team:ResponseOps)

@pmuellr pmuellr added the release_note:skip Skip the PR/issue when compiling release notes label Jul 26, 2024
Copy link
Contributor

@ymao1 ymao1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left round of comments after code review. Will verify it next!


for (const response of responses) {
if (response.status !== 200) {
throw new Error(`Unexpected status code: ${response.status}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we pass this error to this.errors$ as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, though it's making me wonder, with the weird partial result error stuff from CCS calls, should we just skip over these? If just one of the queries is bad for some reason, but the other ones were ok, and that was consistent, we'd never pull any tasks. Vs pulling tasks for everything but one of the "inner searches" failing.

I guess we'll figure that out ... :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 842d402

@@ -504,6 +505,36 @@ export class TaskStore {
}
}

async msearch(opts: SearchOpts[] = []): Promise<FetchResult> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a unit test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note this is NOT in 842d402, as I wanted the functional changes in, I think this test will likely be pretty hairy, and could probably be deferred (but taking a look right now!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 4b357c1.

}

const capacity = getCapacity(definition.type);
result.limitedTypes.set(definition.type, capacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check for capacity=0 and not add to this map to avoid issuing a query with size 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 842d402

RecognizedTask
);

const query = matchesClauses(queryForLimitedTasks, filterDownBy(InactiveTasks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add tasksWithPartitions to this clause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 842d402

}

const capacity = getCapacity(definition.type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The capacity that's returned is actually now returned in cost (for the mget claim strategy), so for a normal cost task with maxConcurrency=1, it'll return 2. To convert capacity to a "number of tasks we can search for", I would divide this by the cost of the task:

const capacity = getCapacity(definition.type) / definition.cost

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 842d402


const { types, excludedTypes, removedTypes, getCapacity, definitions } = opts;
for (const type of types) {
if (excludedTypes.has(type)) continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed while adding an integration test #189431 that this uses slightly different logic than the default task claimer and doesn't respect wildcards. I think we should use the same function used for the default task claimer. Updated in my integration test PR so one of us will have a conflict!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up fixing this in the last main merge, since that was part of what the merge conflicted with.

@@ -15,23 +15,6 @@ import {
MustNotCondition,
} from './query_clauses';

export function taskWithLessThanMaxAttempts(type: string, maxAttempts: number): MustCondition {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a few lingering references to search-related things regarding tasks running too many attempts. I believe this got resolved in #152841; though not sure if that applies to recurring tasks. @mikecote @ymao1 ??? In any case, this function was no longer being used, so figured I might as well delete it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don't think we enforced anything with max attempts for recurring task types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 shouldn't be used for recurring tasks, only ad-hoc (one time) tasks

@pmuellr
Copy link
Member Author

pmuellr commented Aug 19, 2024

@elasticmachine merge upstream

@pmuellr
Copy link
Member Author

pmuellr commented Aug 23, 2024

@elasticmachine merge upstream

@mikecote
Copy link
Contributor

@elasticmachine merge upstream

query,
sort,
size,
size: capacity,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we should add a size multiplier here to account for possible conflicts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should as the same concept for mget applies here. I'll add that in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 48806ca.

@@ -167,7 +166,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// apply limited concurrency limits (TODO: can currently starve other tasks)
const candidateTasks = applyLimitedConcurrency(currentTasks, batches);
const candidateTasks = selectTasksByCapacity(currentTasks, batches);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we still need this since we're searching directly using the msearch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think because we will now apply the SIZE_MULTIPLIER_FOR_TASK_FETCH multiplier, we'll need to replicate the concurrency limitations in Kibana. I think this function will still be necessary but looking at the code, it should still consider the available capacity (tasks currently running).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed offline and given the code works with concurrency of 1, we can follow up the work to fix the code when concurrencies > 1 #191301

Copy link
Contributor

@ymao1 ymao1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@kibana-ci
Copy link
Collaborator

💛 Build succeeded, but was flaky

Failed CI Steps

Test Failures

  • [job] [logs] FTR Configs #62 / Cloud Security Posture Test adding Cloud Security Posture Integrations CSPM AWS CIS_AWS Single Manual Assume Role CIS_AWS Single Manual Assume Role Workflow

Metrics [docs]

✅ unchanged

History

To update your PR or re-run it, just comment with:
@elasticmachine merge upstream

@mikecote mikecote merged commit d3fdb7d into elastic:main Aug 26, 2024
38 checks passed
@kibanamachine kibanamachine added the backport:skip This commit does not require backporting label Aug 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport:skip This commit does not require backporting Feature:Task Manager release_note:skip Skip the PR/issue when compiling release notes Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams) v8.16.0
Projects
None yet
6 participants