Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite relative paths using dyn.Location of the underlying value #1273

Merged
merged 20 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions bundle/config/mutator/translate_paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/notebook"
)

Expand Down Expand Up @@ -150,55 +151,56 @@ func translateNoOp(literal, localFullPath, localRelPath, remotePath string) (str
return localRelPath, nil
}

type transformer struct {
// A directory path relative to which `path` will be transformed
dir string
// A path to transform
path *string
// Name of the config property where the path string is coming from
configPath string
// A function that performs the actual rewriting logic.
fn rewriteFunc
func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) {
out := v.MustString()
err := m.rewritePath(dir, b, &out, fn)
if err != nil {
if target := (&ErrIsNotebook{}); errors.As(err, target) {
return dyn.InvalidValue, fmt.Errorf(`expected a file for "%s" but got a notebook: %w`, p, target)
}
if target := (&ErrIsNotNotebook{}); errors.As(err, target) {
return dyn.InvalidValue, fmt.Errorf(`expected a notebook for "%s" but got a file: %w`, p, target)
}
return dyn.InvalidValue, err
}

return dyn.NewValue(out, v.Location()), nil
}

type transformFunc func(resource any, dir string) *transformer
func (m *translatePaths) rewriteRelativeTo(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dirs []string) (dyn.Value, error) {
var err error

// Apply all matches transformers for the given resource
func (m *translatePaths) applyTransformers(funcs []transformFunc, b *bundle.Bundle, resource any, dir string) error {
for _, transformFn := range funcs {
transformer := transformFn(resource, dir)
if transformer == nil {
for _, dir := range dirs {
pietern marked this conversation as resolved.
Show resolved Hide resolved
nv, nerr := m.rewriteValue(b, p, v, fn, dir)
if nerr != nil {
// If we haven't seen an error yet, store the first one.
if err == nil {
err = nerr
}
continue
}

err := m.rewritePath(transformer.dir, b, transformer.path, transformer.fn)
if err != nil {
if target := (&ErrIsNotebook{}); errors.As(err, target) {
return fmt.Errorf(`expected a file for "%s" but got a notebook: %w`, transformer.configPath, target)
}
if target := (&ErrIsNotNotebook{}); errors.As(err, target) {
return fmt.Errorf(`expected a notebook for "%s" but got a file: %w`, transformer.configPath, target)
}
return err
}
return nv, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function performs the fallback. If a rewrite with first dir doesn't work, it tries a rewrite with the second dir.

}

return nil
return dyn.InvalidValue, err
}

func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) error {
m.seen = make(map[string]string)

for _, fn := range []func(*translatePaths, *bundle.Bundle) error{
applyJobTransformers,
applyPipelineTransformers,
applyArtifactTransformers,
} {
err := fn(m, b)
if err != nil {
return err
return b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
var err error
for _, fn := range []func(*bundle.Bundle, dyn.Value) (dyn.Value, error){
m.applyJobTranslations,
m.applyPipelineTranslations,
m.applyArtifactTranslations,
} {
v, err = fn(b, v)
if err != nil {
return dyn.InvalidValue, err
}
}
}

return nil
return v, nil
})
}
59 changes: 30 additions & 29 deletions bundle/config/mutator/translate_paths_artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,40 @@ import (
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/dyn"
)

func transformArtifactPath(resource any, dir string) *transformer {
artifact, ok := resource.(*config.Artifact)
if !ok {
return nil
}

return &transformer{
dir,
&artifact.Path,
"artifacts.path",
translateNoOp,
}
}

func applyArtifactTransformers(m *translatePaths, b *bundle.Bundle) error {
artifactTransformers := []transformFunc{
transformArtifactPath,
}

for key, artifact := range b.Config.Artifacts {
dir, err := artifact.ConfigFileDirectory()
if err != nil {
return fmt.Errorf("unable to determine directory for artifact %s: %w", key, err)
}

err = m.applyTransformers(artifactTransformers, b, artifact, dir)
func (m *translatePaths) applyArtifactTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) {
var err error

// Base pattern to match all artifacts.
base := dyn.NewPattern(
dyn.Key("artifacts"),
dyn.AnyKey(),
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: because artifacts use the translateNoOp translation (which doesn't check if a path exists), the fallback never kicks in. This is why I didn't include fallback for artifact blocks.

We could implement this as long as we check if the directory for the artifact exists, which on its own could cause issues (not sure about the guarantees, though I expect the directory has to exist).


for _, t := range []struct {
pattern dyn.Pattern
fn rewriteFunc
}{
{
base.Append(dyn.Key("path")),
translateNoOp,
},
} {
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
key := p[1].Key()
dir, err := v.Location().Directory()
if err != nil {
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for artifact %s: %w", key, err)
}

return m.rewriteRelativeTo(b, p, v, t.fn, []string{dir})
})
if err != nil {
return err
return dyn.InvalidValue, err
}
}

return nil
return v, nil
}
179 changes: 70 additions & 109 deletions bundle/config/mutator/translate_paths_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,132 +2,93 @@ package mutator

import (
"fmt"
"slices"

"github.com/databricks/cli/bundle"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/cli/libs/dyn"
)

func transformNotebookTask(resource any, dir string) *transformer {
task, ok := resource.(*jobs.Task)
if !ok || task.NotebookTask == nil {
return nil
}

return &transformer{
dir,
&task.NotebookTask.NotebookPath,
"tasks.notebook_task.notebook_path",
translateNotebookPath,
}
}

func transformSparkTask(resource any, dir string) *transformer {
task, ok := resource.(*jobs.Task)
if !ok || task.SparkPythonTask == nil {
return nil
}

return &transformer{
dir,
&task.SparkPythonTask.PythonFile,
"tasks.spark_python_task.python_file",
translateFilePath,
}
}

func transformWhlLibrary(resource any, dir string) *transformer {
library, ok := resource.(*compute.Library)
if !ok || library.Whl == "" {
return nil
}

return &transformer{
dir,
&library.Whl,
"libraries.whl",
translateNoOp, // Does not convert to remote path but makes sure that nested paths resolved correctly
}
}

func transformDbtTask(resource any, dir string) *transformer {
task, ok := resource.(*jobs.Task)
if !ok || task.DbtTask == nil {
return nil
}

return &transformer{
dir,
&task.DbtTask.ProjectDirectory,
"tasks.dbt_task.project_directory",
translateDirectoryPath,
}
}

func transformSqlFileTask(resource any, dir string) *transformer {
task, ok := resource.(*jobs.Task)
if !ok || task.SqlTask == nil || task.SqlTask.File == nil {
return nil
}

return &transformer{
dir,
&task.SqlTask.File.Path,
"tasks.sql_task.file.path",
translateFilePath,
}
}

func transformJarLibrary(resource any, dir string) *transformer {
library, ok := resource.(*compute.Library)
if !ok || library.Jar == "" {
return nil
}

return &transformer{
dir,
&library.Jar,
"libraries.jar",
translateNoOp, // Does not convert to remote path but makes sure that nested paths resolved correctly
}
}

func applyJobTransformers(m *translatePaths, b *bundle.Bundle) error {
jobTransformers := []transformFunc{
transformNotebookTask,
transformSparkTask,
transformWhlLibrary,
transformJarLibrary,
transformDbtTask,
transformSqlFileTask,
}
func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) {
var fallback = make(map[string]string)
var ignore []string
var err error

for key, job := range b.Config.Resources.Jobs {
dir, err := job.ConfigFileDirectory()
if err != nil {
return fmt.Errorf("unable to determine directory for job %s: %w", key, err)
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
}

// If we cannot resolve the relative path using the [dyn.Value] location itself,
// use the job's location as fallback. This is necessary for backwards compatibility.
fallback[key] = dir

// Do not translate job task paths if using git source
if job.GitSource != nil {
continue
ignore = append(ignore, key)
}
}

for i := 0; i < len(job.Tasks); i++ {
task := &job.Tasks[i]
err := m.applyTransformers(jobTransformers, b, task, dir)
if err != nil {
return err
// Base pattern to match all tasks in all jobs.
base := dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("tasks"),
dyn.AnyIndex(),
)

for _, t := range []struct {
pattern dyn.Pattern
fn rewriteFunc
}{
{
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
translateNotebookPath,
},
{
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
translateFilePath,
},
{
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
translateDirectoryPath,
},
{
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
translateFilePath,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
translateNoOp,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
translateNoOp,
},
} {
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
key := p[2].Key()

// Skip path translation if the job is using git source.
if slices.Contains(ignore, key) {
return v, nil
}
for j := 0; j < len(task.Libraries); j++ {
library := &task.Libraries[j]
err := m.applyTransformers(jobTransformers, b, library, dir)
if err != nil {
return err
}

dir, err := v.Location().Directory()
if err != nil {
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
}

return m.rewriteRelativeTo(b, p, v, t.fn, []string{
dir,
fallback[key],
})
})
if err != nil {
return dyn.InvalidValue, err
}
}

return nil
return v, nil
}
Loading
Loading