Skip to content

Commit

Permalink
Load TaskDefinitions lazily when building taskGraph (#3562)
Browse files Browse the repository at this point in the history
Previously, before the `Engine` struct is Prepared for execution, Turbo
stored a map of `core.Task` instances to look up Task
Definitions. 

This change removes that behavior and instead looks up
TaskDefinitions during task graph construction. This bakes in the
assumption that there is more than one place for Tasks to be defined,
which is a pre-requisite for composable configs.

This implementation hardcodes looking up the `turbo.json` from the root
workspace, but in the future, we'll update to start looking for the TaskDefinition
in the task's workspace.
  • Loading branch information
mehulkar committed Feb 3, 2023
1 parent cfd3ded commit b1d013a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 48 deletions.
123 changes: 85 additions & 38 deletions cli/internal/core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,33 @@ type Visitor = func(taskID string) error
// Engine contains both the DAG for the packages and the tasks and implements the methods to execute tasks in them
type Engine struct {
// TaskGraph is a graph of package-tasks
TaskGraph *dag.AcyclicGraph
// Tasks are a map of tasks in the engine
Tasks map[string]*Task
TaskGraph *dag.AcyclicGraph
PackageTaskDeps map[string][]string
rootEnabledTasks util.Set

// completeGraph is the CompleteGraph. We need this to look up the Pipeline, etc.
completeGraph *graph.CompleteGraph

// Map of packageName to pipeline. We resolve task definitions from here
// but we don't want to read from the filesystem every time
pipelines map[string]fs.Pipeline

// isSinglePackage is used to load turbo.json correctly
isSinglePackage bool
}

// NewEngine creates a new engine given a topologic graph of workspace package names
func NewEngine(completeGraph *graph.CompleteGraph) *Engine {
func NewEngine(
completeGraph *graph.CompleteGraph,
isSinglePackage bool,
) *Engine {
return &Engine{
completeGraph: completeGraph,
Tasks: make(map[string]*Task),
TaskGraph: &dag.AcyclicGraph{},
PackageTaskDeps: map[string][]string{},
rootEnabledTasks: make(util.Set),
pipelines: map[string]fs.Pipeline{},
isSinglePackage: isSinglePackage,
}
}

Expand All @@ -58,24 +67,6 @@ type EngineBuildingOptions struct {
TasksOnly bool
}

// Prepare constructs the Task Graph for a list of packages and tasks
func (e *Engine) Prepare(options *EngineBuildingOptions) error {
pkgs := options.Packages
tasks := options.TaskNames
if len(tasks) == 0 {
// TODO(gsoltis): Is this behavior used?
for key := range e.Tasks {
tasks = append(tasks, key)
}
}

if err := e.generateTaskGraph(pkgs, tasks, options.TasksOnly); err != nil {
return err
}

return nil
}

// EngineExecutionOptions controls a single walk of the task graph
type EngineExecutionOptions struct {
// Parallel is whether to run tasks in parallel
Expand Down Expand Up @@ -107,31 +98,52 @@ func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error {
}

func (e *Engine) getTaskDefinition(taskName string, taskID string) (*Task, error) {
if task, ok := e.Tasks[taskID]; ok {
return task, nil
pipeline, err := e.getPipelineFromWorkspace(util.RootPkgName)
if err != nil {
return nil, err
}
if task, ok := e.Tasks[taskName]; ok {
return task, nil

if task, ok := pipeline[taskID]; ok {
return &Task{
Name: taskName,
TaskDefinition: task,
}, nil
}

if task, ok := pipeline[taskName]; ok {
return &Task{
Name: taskName,
TaskDefinition: task,
}, nil
}

return nil, fmt.Errorf("Missing task definition, configure \"%s\" or \"%s\" in turbo.json", taskName, taskID)
}

func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly bool) error {
// Prepare constructs the Task Graph for a list of packages and tasks
func (e *Engine) Prepare(options *EngineBuildingOptions) error {
pkgs := options.Packages
taskNames := options.TaskNames
tasksOnly := options.TasksOnly

traversalQueue := []string{}

for _, pkg := range pkgs {
isRootPkg := pkg == util.RootPkgName

for _, taskName := range taskNames {
// If it's not a task from the root workspace (i.e. tasks from every other workspace)
// or if it's a task that we know is rootEnabled task, add it to the traversal queue.
if !isRootPkg || e.rootEnabledTasks.Includes(taskName) {
taskID := util.GetTaskId(pkg, taskName)
// Skip tasks that don't have a definition
if _, err := e.getTaskDefinition(taskName, taskID); err != nil {
// Initial, non-package tasks are not required to exist, as long as some
// Initially, non-package tasks are not required to exist, as long as some
// package in the list packages defines it as a package-task. Dependencies
// *are* required to have a definition.
continue
}

traversalQueue = append(traversalQueue, taskID)
}
}
Expand Down Expand Up @@ -272,19 +284,14 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly
return nil
}

// AddTask adds a task to the Engine so it can be looked up later.
func (e *Engine) AddTask(task *Task) *Engine {
// If a root task is added, mark the task name as eligible for
// root execution. Otherwise, it will be skipped.
if util.IsPackageTask(task.Name) {
pkg, taskName := util.GetPackageTaskFromId(task.Name)
// AddTask adds root tasks to the engine so they can be looked up later.
func (e *Engine) AddTask(taskName string) {
if util.IsPackageTask(taskName) {
pkg, taskName := util.GetPackageTaskFromId(taskName)
if pkg == util.RootPkgName {
e.rootEnabledTasks.Add(taskName)
}
}

e.Tasks[task.Name] = task
return e
}

// AddDep adds tuples from+to task ID combos in tuple format so they can be looked up later.
Expand Down Expand Up @@ -422,3 +429,43 @@ func (e *Engine) GetTaskGraphDescendants(taskID string) ([]string, error) {
sort.Strings(stringDescendents)
return stringDescendents, nil
}

func (e *Engine) getPipelineFromWorkspace(workspaceName string) (fs.Pipeline, error) {
cachedPipeline, ok := e.pipelines[workspaceName]
if ok {
return cachedPipeline, nil
}

// Note: dir for the root workspace will be an empty string, and for
// other workspaces, it will be a relative path.
dir := e.completeGraph.WorkspaceInfos[workspaceName].Dir
repoRoot := e.completeGraph.RepoRoot
dirAbsolutePath := dir.RestoreAnchor(repoRoot)

// We need to a PackageJSON, because LoadTurboConfig requires it as an argument
// so it can synthesize tasks for single-package repos.
// In the root workspace, actually get and use the root package.json.
// For all other workspaces, we don't need the synthesis feature, so we can proceed
// with a default/blank PackageJSON
pkgJSON := &fs.PackageJSON{}

if workspaceName == util.RootPkgName {
rootPkgJSONPath := dirAbsolutePath.Join("package.json")
rootPkgJSON, err := fs.ReadPackageJSON(rootPkgJSONPath)
if err != nil {
return nil, err
}
pkgJSON = rootPkgJSON
}

turboConfig, err := fs.LoadTurboConfig(repoRoot, pkgJSON, e.isSinglePackage)
if err != nil {
return nil, err
}

// Add to internal cache so we don't have to read file system for every task
e.pipelines[workspaceName] = turboConfig.Pipeline

// Return the config from the workspace.
return e.pipelines[workspaceName], nil
}
3 changes: 3 additions & 0 deletions cli/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pyr-sh/dag"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/nodes"
"github.com/vercel/turbo/cli/internal/turbopath"
"github.com/vercel/turbo/cli/internal/util"
)

Expand All @@ -34,6 +35,8 @@ type CompleteGraph struct {

// Map of TaskDefinitions by taskID
TaskDefinitions map[string]*fs.TaskDefinition

RepoRoot turbopath.AbsoluteSystemPath
}

// GetPackageTaskVisitor wraps a `visitor` function that is used for walking the TaskGraph
Expand Down
35 changes: 25 additions & 10 deletions cli/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,16 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {

// TODO: consolidate some of these arguments
g := &graph.CompleteGraph{
WorkspaceGraph: pkgDepGraph.WorkspaceGraph,
WorkspaceGraph: pkgDepGraph.WorkspaceGraph,
// TODO(mehulkar): We can remove pipeline from here eventually
// It is only used by the taskhash tracker to look up taskDefinitions
// but we will eventually replace that
Pipeline: pipeline,
WorkspaceInfos: pkgDepGraph.WorkspaceInfos,
GlobalHash: globalHash,
RootNode: pkgDepGraph.RootNode,
TaskDefinitions: map[string]*fs.TaskDefinition{},
RepoRoot: r.base.RepoRoot,
}
rs := &runSpec{
Targets: targets,
Expand All @@ -268,14 +272,19 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
}
packageManager := pkgDepGraph.PackageManager

engine, err := buildTaskGraphEngine(g, rs)
engine, err := buildTaskGraphEngine(
g,
rs,
r.opts.runOpts.singlePackage,
)

if err != nil {
return errors.Wrap(err, "error preparing engine")
}
tracker := taskhash.NewTracker(
g.RootNode,
g.GlobalHash,
// TODO(mehulkar): remove g,Pipeline, because we need to get task definitions from CompleteGaph instead
g.Pipeline,
g.WorkspaceInfos,
)
Expand All @@ -294,7 +303,11 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
g.WorkspaceGraph.RemoveEdge(edge)
}
}
engine, err = buildTaskGraphEngine(g, rs)
engine, err = buildTaskGraphEngine(
g,
rs,
r.opts.runOpts.singlePackage,
)
if err != nil {
return errors.Wrap(err, "error preparing engine")
}
Expand Down Expand Up @@ -389,14 +402,16 @@ func (r *run) initCache(ctx gocontext.Context, rs *runSpec, analyticsClient anal
})
}

func buildTaskGraphEngine(g *graph.CompleteGraph, rs *runSpec) (*core.Engine, error) {
engine := core.NewEngine(g)
func buildTaskGraphEngine(
g *graph.CompleteGraph,
rs *runSpec,
isSinglePackage bool,
) (*core.Engine, error) {
engine := core.NewEngine(g, isSinglePackage)

for taskName, taskDefinition := range g.Pipeline {
engine.AddTask(&core.Task{
Name: taskName,
TaskDefinition: taskDefinition,
})
// Note: g.Pipeline is a map, but this for loop only cares about the keys
for taskName := range g.Pipeline {
engine.AddTask(taskName)
}

if err := engine.Prepare(&core.EngineBuildingOptions{
Expand Down

0 comments on commit b1d013a

Please sign in to comment.