Skip to content

Commit

Permalink
fix(task-graph): fix task deduplication
Browse files Browse the repository at this point in the history
Fixed a regression that allowed more than one pending (i.e. not
in-progress) task to be added to `TaskGraph` for a given `key`.

This happened when the add & process queues were merged into a single
op queue (for valid reasons), and could cause repeated file changes
to result in unnecessarily repeated builds, deploys etc.

After this fix, at most one task can be pending for a given `key`.
  • Loading branch information
thsig authored and edvald committed Jul 9, 2019
1 parent bf6f3a6 commit 6979f8b
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions garden-service/src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as PQueue from "p-queue"
import chalk from "chalk"
import * as yaml from "js-yaml"
import hasAnsi = require("has-ansi")
import { merge, padEnd, pick, flatten } from "lodash"
import { flatten, merge, padEnd, pick } from "lodash"
import { BaseTask, TaskDefinitionError } from "./tasks/base"

import { LogEntry, LogEntryMetadata, TaskLogStatus } from "./logger/log-entry"
Expand Down Expand Up @@ -48,8 +48,9 @@ export const TASK_CONCURRENCY = (concurrencyFromEnv && parseInt(concurrencyFromE
export class TaskGraph {
private roots: TaskNodeMap
private index: TaskNodeMap

private inProgress: TaskNodeMap
private pendingKeys: Set<string>

private logEntryMap: LogEntryMap

/**
Expand All @@ -65,14 +66,26 @@ export class TaskGraph {
this.roots = new TaskNodeMap()
this.index = new TaskNodeMap()
this.inProgress = new TaskNodeMap()
this.pendingKeys = new Set()
this.taskDependencyCache = {}
this.resultCache = new ResultCache()
this.opQueue = new PQueue({ concurrency: 1 })
this.logEntryMap = {}
}

async process(tasks: BaseTask[]): Promise<TaskResults> {
return this.opQueue.add(() => this.processTasksInternal(tasks))
// We want at most one pending (i.e. not in-progress) task for a given key at any given time,
// so we deduplicate here.
const tasksToProcess = tasks.filter(t => !this.pendingKeys.has(t.getKey()))
for (const t of tasksToProcess) {
this.pendingKeys.add(t.getKey())
}

// Regardless of whether it was added by this call to this.processTasksInternal, we want
// to return the latest result for each requested task.
const resultKeys = tasks.map(t => t.getKey())

return this.opQueue.add(() => this.processTasksInternal(tasksToProcess, resultKeys))
}

/**
Expand Down Expand Up @@ -133,7 +146,7 @@ export class TaskGraph {
/**
* Process the graph until it's complete.
*/
private async processTasksInternal(tasks: BaseTask[]): Promise<TaskResults> {
private async processTasksInternal(tasks: BaseTask[], resultKeys: string[]): Promise<TaskResults> {
for (const task of tasks) {
await this.addTask(task)
}
Expand Down Expand Up @@ -187,6 +200,7 @@ export class TaskGraph {
pick(results, dependencyBaseKeys))

try {
this.pendingKeys.delete(task.getKey())
this.garden.events.emit("taskProcessing", {
startedAt: new Date(),
key: task.getKey(),
Expand Down Expand Up @@ -219,6 +233,15 @@ export class TaskGraph {

this.rebuild()

for (const resultKey of resultKeys) {
if (!results[resultKey]) {
// We know there's a cached result for resultKey, since each key in resultKeys
// corresponds to a task that was processed during this run of processTasks, or
// during a previous run of processTasks. See the process method above for details.
results[resultKey] = this.resultCache.getNewest(resultKey)!
}
}

return results
}

Expand Down Expand Up @@ -255,6 +278,7 @@ export class TaskGraph {
private remove(node: TaskNode) {
this.index.removeNode(node)
this.inProgress.removeNode(node)
this.pendingKeys.delete(node.getKey())
}

// Recursively remove node's dependants, without removing node.
Expand Down

0 comments on commit 6979f8b

Please sign in to comment.