Skip to content

Commit

Permalink
Change export tasks generation to be iterative
Browse files Browse the repository at this point in the history
Still requires memory linear in # of success directories, but makes it easier to work with
iterating over diffs.
  • Loading branch information
arielshaqed committed Nov 4, 2020
1 parent e369100 commit 9dddbac
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 25 deletions.
59 changes: 43 additions & 16 deletions export/tasks_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,43 +224,64 @@ func makeDiffTaskBody(out *parade.TaskData, idGen taskIDGenerator, diff catalog.
return nil
}

// GenerateTasksFromDiffs converts diffs into many tasks that depend on startTaskID, with a
// "generate success" task after generating all files in each directory that matches
// generateSuccessFor.
func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Differences, generateSuccessFor func(path string) bool) ([]parade.TaskData, error) {
const initialSize = 1_000

one := 1 // Number of dependencies of many tasks. This will *not* change.
numTries := 5
// TasksGenerator generates tasks from diffs iteratively.
type TasksGenerator struct {
ExportID string
DstPrefix string
GenerateSuccessFor func(path string) bool
NumTries int

makeDestination func(string) string
idGen taskIDGenerator
successTasksGenerator SuccessTasksTreeGenerator
}

// NewTasksGenerator returns a generator that exports tasks from diffs to file operations under
// dstPrefix. It generates success files for files in directories matched by
// "generateSuccessFor".
func NewTasksGenerator(exportID string, dstPrefix string, generateSuccessFor func(path string) bool) *TasksGenerator {
dstPrefix = strings.TrimRight(dstPrefix, "/")
makeDestination := func(path string) string {
return fmt.Sprintf("%s/%s", dstPrefix, path)
}

idGen := taskIDGenerator(exportID)
return &TasksGenerator{
ExportID: exportID,
DstPrefix: dstPrefix,
GenerateSuccessFor: generateSuccessFor,
NumTries: 5,
makeDestination: makeDestination,
idGen: taskIDGenerator(exportID),
successTasksGenerator: NewSuccessTasksTreeGenerator(
exportID, generateSuccessFor, makeDestination),
}
}

successTasksGenerator := NewSuccessTasksTreeGenerator(
exportID, generateSuccessFor, makeDestination)
// Add translates diffs into many tasks and remembers "generate success" tasks for Finish. It
// returns some tasks that can already be added.
func (e *TasksGenerator) Add(diffs catalog.Differences) ([]parade.TaskData, error) {
const initialSize = 1_000

one := 1 // Number of dependencies of many tasks. This will *not* change.

ret := make([]parade.TaskData, 0, initialSize)

// Create the file operation tasks
// Create file operation tasks to return
for _, diff := range diffs {
if diff.Path == "" {
return nil, fmt.Errorf("no \"Path\" in %+v: %w", diff, ErrMissingColumns)
}

task := parade.TaskData{
StatusCode: parade.TaskPending,
MaxTries: &numTries,
MaxTries: &e.NumTries,
TotalDependencies: &one, // Depends only on a start task
}
err := makeDiffTaskBody(&task, idGen, diff, makeDestination)
err := makeDiffTaskBody(&task, e.idGen, diff, e.makeDestination)
if err != nil {
return ret, err
}
id, err := successTasksGenerator.AddFor(diff.Path)
id, err := e.successTasksGenerator.AddFor(diff.Path)
if err != nil {
return ret, fmt.Errorf("generate tasks after %+v: %w", diff, err)
}
Expand All @@ -269,7 +290,13 @@ func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Dif
ret = append(ret, task)
}

ret = successTasksGenerator.GenerateTasksTo(ret)
return ret, nil
}

// Finish ends tasks generation, releasing any tasks for success and finish.
func (e *TasksGenerator) Finish() ([]parade.TaskData, error) {
ret := make([]parade.TaskData, 0)
ret = e.successTasksGenerator.GenerateTasksTo(ret)

return ret, nil
}
39 changes: 30 additions & 9 deletions export/tasks_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ var zero int = 0
var one int = 1

func TestTasksGenerator_Empty(t *testing.T) {
tasks, err := export.GenerateTasksFromDiffs(
gen := export.NewTasksGenerator(
"empty",
"testfs://prefix/",
catalog.Differences{},
func(_ string) bool { return true })

tasks, err := gen.Finish()
if err != nil {
t.Fatalf("failed to GenerateTasksFromDiffs: %s", err)
}
Expand Down Expand Up @@ -253,15 +254,19 @@ func TestTasksGenerator_Simple(t *testing.T) {
Type: catalog.DifferenceTypeRemoved,
Entry: catalog.Entry{Path: "remove1", PhysicalAddress: "/remove1"},
}}
tasksWithIDs, err := export.GenerateTasksFromDiffs(
gen := export.NewTasksGenerator(
"simple",
"testfs://prefix/",
catalogDiffs,
func(_ string) bool { return false })
tasksWithIDs, err := gen.Add(catalogDiffs)
if err != nil {
t.Fatalf("failed to GenerateTasksFromDiffs: %s", err)
t.Fatalf("failed to add tasks: %s", err)
}

finishTasks, err := gen.Finish()
if err != nil {
t.Fatalf("failed to finish generating tasks: %s", err)
}
tasksWithIDs = append(tasksWithIDs, finishTasks...)
tasks := cleanup(tasksWithIDs)

copyTasks := getTasks(isCopy, tasks)
Expand Down Expand Up @@ -371,15 +376,31 @@ func TestTasksGenerator_SuccessFiles(t *testing.T) {
{before: "foo:delete:/remove5", after: "foo:make-success:a/plain", avoid: true},
{before: "foo:delete:/remove5", after: "foo:finished"},
}
tasksWithIDs, err := export.GenerateTasksFromDiffs(
gen := export.NewTasksGenerator(
"foo",
"testfs://prefix/",
catalogDiffs,
func(path string) bool { return strings.HasSuffix(path, "success") },
)

tasksWithIDs := make([]parade.TaskData, 0, len(catalogDiffs))

for o := 0; o < len(catalogDiffs); o += 3 {
end := o+3
if end > len(catalogDiffs) {
end = len(catalogDiffs)
}
slice := catalogDiffs[o:end]
moreTasks, err := gen.Add(slice)
if err != nil {
t.Fatalf("failed to add tasks %d..%d: %+v: %s", o, end, catalogDiffs[o:o+3], err)
}
tasksWithIDs = append(tasksWithIDs, moreTasks...)
}
moreTasks, err := gen.Finish()
if err != nil {
t.Fatalf("failed to GenerateTasksFromDiffs: %s", err)
t.Fatalf("failed to finish generating tasks: %s", err)
}
tasksWithIDs = append(tasksWithIDs, moreTasks...)

for i, task := range tasksWithIDs {
t.Logf("task %d: %+v", i, task)
Expand Down

0 comments on commit 9dddbac

Please sign in to comment.