Skip to content

Commit

Permalink
Merge pull request #803 from treeverse/feature/534-diff-to-tasks
Browse files Browse the repository at this point in the history
Generate tasks from diffs for exporting
  • Loading branch information
arielshaqed authored Oct 14, 2020
2 parents 1cea346 + d7eb030 commit 6763612
Show file tree
Hide file tree
Showing 10 changed files with 829 additions and 54 deletions.
8 changes: 4 additions & 4 deletions catalog/cataloger_diff_uncommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCataloger_DiffUncommitted_Pagination(t *testing.T) {
for i := 0; i < numOfFiles; i++ {
p := "/file" + strconv.Itoa(i)
testCatalogerCreateEntry(t, ctx, c, repository, "master", p, nil, "")
expectedDifferences = append(expectedDifferences, Difference{Type: DifferenceTypeAdded, Path: p})
expectedDifferences = append(expectedDifferences, Difference{Type: DifferenceTypeAdded, Entry: Entry{Path: p}})
}
const changesPerPage = 3
var differences Differences
Expand Down Expand Up @@ -83,9 +83,9 @@ func TestCataloger_DiffUncommitted_Changes(t *testing.T) {
}

changes := Differences{
Difference{Type: DifferenceTypeRemoved, Path: "/file1"},
Difference{Type: DifferenceTypeChanged, Path: "/file2"},
Difference{Type: DifferenceTypeAdded, Path: "/file5"},
Difference{Type: DifferenceTypeRemoved, Entry: Entry{Path: "/file1"}},
Difference{Type: DifferenceTypeChanged, Entry: Entry{Path: "/file2"}},
Difference{Type: DifferenceTypeAdded, Entry: Entry{Path: "/file5"}},
}
if diff := deep.Equal(differences, changes); diff != nil {
t.Fatal("DiffUncommitted", diff)
Expand Down
4 changes: 2 additions & 2 deletions catalog/cataloger_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func TestCataloger_Merge_FromParentConflicts(t *testing.T) {
t.Errorf("Merge reference = %s, expected to be empty", res.Reference)
}
expectedDifferences := Differences{
Difference{Type: DifferenceTypeConflict, Path: "/file2"},
Difference{Type: DifferenceTypeConflict, Path: "/file5"},
Difference{Type: DifferenceTypeConflict, Entry: Entry{Path: "/file2"}},
Difference{Type: DifferenceTypeConflict, Entry: Entry{Path: "/file5"}},
}
if res.Summary[DifferenceTypeConflict] != len(expectedDifferences) {
t.Fatalf("Merge summary conflicts=%d, expected %d", res.Summary[DifferenceTypeConflict], len(expectedDifferences))
Expand Down
4 changes: 2 additions & 2 deletions catalog/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const (
)

type Difference struct {
Type DifferenceType `db:"diff_type"`
Path string `db:"path"`
Entry // Partially filled.
Type DifferenceType `db:"diff_type"`
}

func (d Difference) String() string {
Expand Down
22 changes: 21 additions & 1 deletion docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ definitions:
format: int32
minProperties: 1

config:
type: object
properties:
blockstore.type:
type: string

paths:

/setup_lakefs:
Expand All @@ -389,7 +395,6 @@ paths:
description: user created successfully
schema:
$ref: "#/definitions/credentials_with_secret"
description: first user credentials
400:
description: bad request
schema:
Expand Down Expand Up @@ -2021,3 +2026,18 @@ paths:
responses:
204:
description: NoContent

/config:
get:
tags:
- config
operationId: getConfig
description: retrieve the lakefs config
responses:
200:
description: the lakefs config
schema:
$ref: "#/definitions/config"
401:
$ref: "#/responses/Unauthorized"

258 changes: 258 additions & 0 deletions export/tasks_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package export

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/parade"
)

// TODO(ariels): replace catalog.Differences with an iterator.

var (
ErrMissingColumns = errors.New("missing columns in differences result")
ErrConflict = errors.New("cannot generate task for conflict in diff")
)

const successFilename = "_lakefs_success"

const (
CopyAction = "export:copy"
DeleteAction = "export:delete"
TouchAction = "export:touch"
DoneAction = "export:done"
)

type CopyData struct {
From string `json:"from"`
To string `json:"to"`
ETag string `json:"etag"` // Empty for now :-(
}

type DeleteData struct {
File string `json:"file"`
}

type SuccessData struct {
File string `json:"file"`
}

// Returns the "dirname" of path: everything up to the last "/" (excluding that slash). If
// there are no slashes, returns an empty string.
func dirname(path string) string {
i := strings.LastIndex(path, "/")
if i == -1 {
return ""
}
return path[0:i]
}

type DirMatchCache struct {
pred func(path string) bool
upMatchCache map[string]*string
}

func (dmc *DirMatchCache) Lookup(filename string) (string, bool) {
dir := filename
var ret *string
for {
dir = dirname(dir)
var ok bool
if ret, ok = dmc.upMatchCache[dir]; ok {
break
}
if dmc.pred(dir) {
copy := dir
ret = &copy
break
}
if dir == "" {
break
}
}
for dir = dirname(filename); dir != ""; dir = dirname(dir) {
dmc.upMatchCache[dir] = ret
if ret != nil && dir == *ret {
break
}
}
if dir == "" {
// Cache empty result at top of tree
dmc.upMatchCache[""] = ret
}

if ret == nil {
return "", false
}
return *ret, true
}

func NewDirMatchCache(pred func(path string) bool) *DirMatchCache {
return &DirMatchCache{pred: pred, upMatchCache: make(map[string]*string)}
}

// GenerateTasksFromDiffs converts diffs into many tasks that depend on startTaskID, with a
// "generate success" task after generating all files in each directory that matches
// generateSuccessFor.
func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Differences, generateSuccessFor func(path string) bool) ([]parade.TaskData, error) {
const initialSize = 1_000

one := 1 // Number of dependencies of many tasks. This will *not* change.
numTries := 5

dstPrefix = strings.TrimRight(dstPrefix, "/")
makeDestination := func(path string) string {
return fmt.Sprintf("%s/%s", dstPrefix, path)
}
makeSuccessTaskID := func(path string) parade.TaskID {
return parade.TaskID(fmt.Sprintf("%s:make-success:%s", exportID, path))
}

finishedTaskID := parade.TaskID(fmt.Sprintf("%s:finished", exportID))
finishedTask := parade.TaskData{
ID: finishedTaskID,
Action: DoneAction,
Body: nil,
StatusCode: parade.TaskPending,
MaxTries: &one,
// TotalDependencies filled in later
}
totalTasks := 0

successDirectoriesCache := NewDirMatchCache(generateSuccessFor)
successForDirectory := make(map[string]struct {
count int
toSignal []parade.TaskID
})

makeTaskForDiff := func(diff catalog.Difference) (parade.TaskData, error) {
var (
body []byte
action string
taskID parade.TaskID
toSignal []parade.TaskID
err error
)

if d, ok := successDirectoriesCache.Lookup(diff.Path); ok {
s := successForDirectory[d]
s.count++
successForDirectory[d] = s

toSignal = append(toSignal, makeSuccessTaskID(d))
}
if len(toSignal) == 0 {
toSignal = []parade.TaskID{finishedTaskID}
}

switch diff.Type {
case catalog.DifferenceTypeAdded, catalog.DifferenceTypeChanged:
data := CopyData{
From: diff.PhysicalAddress,
To: makeDestination(diff.Path),
}
body, err = json.Marshal(data)
if err != nil {
return parade.TaskData{}, fmt.Errorf("%+v: failed to serialize %+v: %w", diff, data, err)
}
taskID = parade.TaskID(fmt.Sprintf("%s:copy:%s", exportID, diff.PhysicalAddress))
action = CopyAction
case catalog.DifferenceTypeRemoved:
data := DeleteData{
File: makeDestination(diff.Path),
}
body, err = json.Marshal(data)
if err != nil {
return parade.TaskData{}, fmt.Errorf("%+v: failed to serialize %+v: %w", diff, data, err)
}
taskID = parade.TaskID(fmt.Sprintf("%s:delete:%s", exportID, diff.PhysicalAddress))
action = DeleteAction
case catalog.DifferenceTypeConflict:
return parade.TaskData{}, fmt.Errorf("%+v: %w", diff, ErrConflict)
}
bodyStr := string(body)
return parade.TaskData{
ID: taskID,
Action: action,
Body: &bodyStr,
StatusCode: parade.TaskPending,
MaxTries: &numTries,
TotalDependencies: &one, // Depends only on a start task
ToSignalAfter: toSignal,
}, nil
}

ret := make([]parade.TaskData, 0, initialSize)

// Create the file operation tasks
for _, diff := range diffs {
if diff.Path == "" {
return nil, fmt.Errorf("no \"Path\" in %+v: %w", diff, ErrMissingColumns)
}

task, err := makeTaskForDiff(diff)
if err != nil {
return nil, err
}
totalTasks++

ret = append(ret, task)
}

// Add higher-level success directories, e.g. "a/b-success" for "a/b-success/c/d-success/x".
todoDirs := make([]string, 0, len(successForDirectory))
for successDirectory := range successForDirectory {
todoDirs = append(todoDirs, successDirectory)
}
for len(todoDirs) > 0 {
var d string
todoDirs, d = todoDirs[:len(todoDirs)-1], todoDirs[len(todoDirs)-1]
if upD, ok := successDirectoriesCache.Lookup(d); ok {
s := successForDirectory[upD]
s.count++
successForDirectory[upD] = s

s = successForDirectory[d]
s.toSignal = append(s.toSignal, makeSuccessTaskID(upD))
successForDirectory[d] = s
}
}

// Create any needed "success file" tasks
for successDirectory, td := range successForDirectory {
successPath := fmt.Sprintf("%s/%s", successDirectory, successFilename)
data := SuccessData{
File: makeDestination(successPath),
}
body, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("%s: failed to serialize %+v: %w", successPath, data, err)
}
bodyStr := string(body)
totalDependencies := td // copy to get new address each time

toSignal := totalDependencies.toSignal
if len(toSignal) == 0 {
toSignal = []parade.TaskID{finishedTaskID}
}

ret = append(ret, parade.TaskData{
ID: makeSuccessTaskID(successDirectory),
Action: TouchAction,
Body: &bodyStr,
StatusCode: parade.TaskPending,
MaxTries: &numTries,
TotalDependencies: &totalDependencies.count,
ToSignalAfter: toSignal,
})
totalTasks++
}

finishedTask.TotalDependencies = &totalTasks
ret = append(ret, finishedTask)

return ret, nil
}
Loading

0 comments on commit 6763612

Please sign in to comment.