Skip to content

Commit

Permalink
Actions: support storage ID (#8651)
Browse files Browse the repository at this point in the history
* Actions: support storage ID

* Fix tests
  • Loading branch information
N-o-Z authored Feb 13, 2025
1 parent 9115d3b commit 4e569da
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 239 deletions.
2 changes: 1 addition & 1 deletion pkg/actions/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func marshalEventInformation(actionName, hookID string, record graveler.HookReco
EventTime: now.UTC().Format(time.RFC3339),
ActionName: actionName,
HookID: hookID,
RepositoryID: record.RepositoryID.String(),
RepositoryID: record.Repository.RepositoryID.String(),
BranchID: record.BranchID.String(),
SourceRef: record.SourceRef.String(),
TagID: record.TagID.String(),
Expand Down
16 changes: 9 additions & 7 deletions pkg/actions/hook_output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"context"
"io"
"path"

"github.com/treeverse/lakefs/pkg/graveler"
)

type HookOutputWriter struct {
StorageNamespace string
RunID string
HookRunID string
ActionName string
HookID string
Writer OutputWriter
Repository *graveler.RepositoryRecord
RunID string
HookRunID string
ActionName string
HookID string
Writer OutputWriter
}

const (
Expand All @@ -23,7 +25,7 @@ const (

func (h *HookOutputWriter) OutputWrite(ctx context.Context, reader io.Reader, size int64) error {
name := FormatHookOutputPath(h.RunID, h.HookRunID)
return h.Writer.OutputWrite(ctx, h.StorageNamespace, name, reader, size)
return h.Writer.OutputWrite(ctx, h.Repository, name, reader, size)
}

func FormatHookOutputPath(runID, hookRunID string) string {
Expand Down
40 changes: 26 additions & 14 deletions pkg/actions/hook_output_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ func TestHookWriter_OutputWritePath(t *testing.T) {
runID := hooks.NewRunID()
hookRunID := hooks.NewRunID()
writer := mock.NewMockOutputWriter(ctrl)
writer.EXPECT().OutputWrite(ctx, storageNamespace, actions.FormatHookOutputPath(runID, hookRunID), contentReader, int64(len(content))).Return(nil)
repositoryRecord := &graveler.RepositoryRecord{
RepositoryID: "someRepo",
Repository: &graveler.Repository{
StorageNamespace: storageNamespace,
},
}
writer.EXPECT().OutputWrite(ctx, repositoryRecord, actions.FormatHookOutputPath(runID, hookRunID), contentReader, int64(len(content))).Return(nil)

w := &actions.HookOutputWriter{
StorageNamespace: storageNamespace,
RunID: runID,
HookID: hookID,
HookRunID: hookRunID,
ActionName: actionName,
Writer: writer,
Repository: repositoryRecord,
RunID: runID,
HookID: hookID,
HookRunID: hookRunID,
ActionName: actionName,
Writer: writer,
}
err := w.OutputWrite(ctx, contentReader, int64(len(content)))
if err != nil {
Expand All @@ -50,17 +56,23 @@ func TestHookWriter_OutputWriteError(t *testing.T) {
hooks := graveler.HooksNoOp{}
runID := hooks.NewRunID()
hookRunID := hooks.NewRunID()
repositoryRecord := &graveler.RepositoryRecord{
RepositoryID: "someRepo",
Repository: &graveler.Repository{
StorageNamespace: "storageNamespace",
},
}
errSomeError := errors.New("some error")
writer := mock.NewMockOutputWriter(ctrl)
writer.EXPECT().OutputWrite(ctx, "storageNamespace", actions.FormatHookOutputPath(runID, hookRunID), gomock.Any(), gomock.Any()).Return(errSomeError)
writer.EXPECT().OutputWrite(ctx, repositoryRecord, actions.FormatHookOutputPath(runID, hookRunID), gomock.Any(), gomock.Any()).Return(errSomeError)

w := &actions.HookOutputWriter{
RunID: runID,
HookRunID: hookRunID,
StorageNamespace: "storageNamespace",
ActionName: "actionName",
HookID: "hookID",
Writer: writer,
RunID: runID,
HookRunID: hookRunID,
Repository: repositoryRecord,
ActionName: "actionName",
HookID: "hookID",
Writer: writer,
}
contentReader := strings.NewReader("content")
err := w.OutputWrite(ctx, contentReader, 10)
Expand Down
6 changes: 3 additions & 3 deletions pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func applyRecord(l *lua.State, actionName, hookID string, record graveler.HookRe
"branch_id": record.BranchID.String(),
"source_ref": record.SourceRef.String(),
"tag_id": record.TagID.String(),
"repository_id": record.RepositoryID.String(),
"storage_namespace": record.StorageNamespace.String(),
"repository_id": record.Repository.RepositoryID.String(),
"storage_namespace": record.Repository.StorageNamespace.String(),
"commit": map[string]interface{}{
"message": record.Commit.Message,
"meta_range_id": record.Commit.MetaRangeID.String(),
Expand Down Expand Up @@ -125,7 +125,7 @@ func (h *LuaHook) Run(ctx context.Context, record graveler.HookRecord, buf *byte
return fmt.Errorf("no endpoint configured, cannot request object: %s: %w", h.ScriptPath, ErrInvalidAction)
}
reqURL, err := url.JoinPath(apiutil.BaseURL,
"repositories", string(record.RepositoryID), "refs", string(record.SourceRef), "objects")
"repositories", string(record.Repository.RepositoryID), "refs", string(record.SourceRef), "objects")
if err != nil {
return err
}
Expand Down
68 changes: 44 additions & 24 deletions pkg/actions/lua_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ func TestLuaRun(t *testing.T) {
Username: "user1",
})
err = h.Run(ctx, graveler.HookRecord{
RunID: "abc123",
EventType: graveler.EventTypePreCreateBranch,
RepositoryID: "example123",
StorageNamespace: "local://foo/bar",
SourceRef: "abc123",
BranchID: "my-branch",
RunID: "abc123",
EventType: graveler.EventTypePreCreateBranch,
Repository: &graveler.RepositoryRecord{
RepositoryID: "example123",
Repository: &graveler.Repository{
StorageNamespace: "local://foo/bar",
CreationDate: time.Time{},
},
},
SourceRef: "abc123",
BranchID: "my-branch",
Commit: graveler.Commit{
Version: 1,
},
Expand Down Expand Up @@ -141,12 +146,17 @@ func TestLuaRun_NetHttpDisabled(t *testing.T) {
Username: "user1",
})
err = h.Run(ctx, graveler.HookRecord{
RunID: "abc123",
EventType: graveler.EventTypePreCreateBranch,
RepositoryID: "example123",
StorageNamespace: "local://foo/bar",
SourceRef: "abc123",
BranchID: "my-branch",
RunID: "abc123",
EventType: graveler.EventTypePreCreateBranch,
Repository: &graveler.RepositoryRecord{
RepositoryID: "example123",
Repository: &graveler.Repository{
StorageNamespace: "local://foo/bar",
CreationDate: time.Time{},
},
},
SourceRef: "abc123",
BranchID: "my-branch",
Commit: graveler.Commit{
Version: 1,
},
Expand Down Expand Up @@ -291,12 +301,17 @@ print(code .. " " .. body .. " " .. status)
})
runID := nanoid.Must(20)
err = h.Run(ctx, graveler.HookRecord{
RunID: runID,
EventType: graveler.EventTypePreCreateBranch,
RepositoryID: "example123",
StorageNamespace: "local://foo/bar",
SourceRef: "abc123",
BranchID: "my-branch",
RunID: runID,
EventType: graveler.EventTypePreCreateBranch,
Repository: &graveler.RepositoryRecord{
RepositoryID: "example123",
Repository: &graveler.Repository{
StorageNamespace: "local://foo/bar",
CreationDate: time.Time{},
},
},
SourceRef: "abc123",
BranchID: "my-branch",
Commit: graveler.Commit{
Version: 1,
},
Expand Down Expand Up @@ -420,12 +435,17 @@ func TestLuaRunTable(t *testing.T) {
Username: "user1",
})
err = h.Run(ctx, graveler.HookRecord{
RunID: "abc123",
EventType: graveler.EventTypePreCreateBranch,
RepositoryID: "example123",
StorageNamespace: "local://foo/bar",
SourceRef: "abc123",
BranchID: "my-branch",
RunID: "abc123",
EventType: graveler.EventTypePreCreateBranch,
Repository: &graveler.RepositoryRecord{
RepositoryID: "example123",
Repository: &graveler.Repository{
StorageNamespace: "local://foo/bar",
CreationDate: time.Time{},
},
},
SourceRef: "abc123",
BranchID: "my-branch",
Commit: graveler.Commit{
Version: 1,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/actions/mock/mock_actions.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/actions/output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package actions
import (
"context"
"io"

"github.com/treeverse/lakefs/pkg/graveler"
)

type OutputWriter interface {
OutputWrite(ctx context.Context, storageNamespace, name string, reader io.Reader, size int64) error
OutputWrite(ctx context.Context, repository *graveler.RepositoryRecord, name string, reader io.Reader, size int64) error
}
42 changes: 21 additions & 21 deletions pkg/actions/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import (
"sync"
"time"

"github.com/treeverse/lakefs/pkg/actions/lua/hook"

"github.com/antonmedv/expr"
"github.com/hashicorp/go-multierror"
"github.com/treeverse/lakefs/pkg/actions/lua/hook"
"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/kv"
Expand Down Expand Up @@ -212,7 +211,7 @@ type TaskResultIterator interface {
type Service interface {
Stop()
Run(ctx context.Context, record graveler.HookRecord) error
UpdateCommitID(ctx context.Context, repositoryID string, storageNamespace string, runID string, commitID string) error
UpdateCommitID(ctx context.Context, repository *graveler.RepositoryRecord, runID string, commitID string) error
GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)
GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)
ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)
Expand Down Expand Up @@ -317,18 +316,18 @@ func (s *StoreService) allocateTasks(runID string, actions []*Action) ([][]*Task
var tasks [][]*Task
for actionIdx, action := range actions {
var actionTasks []*Task
for hookIdx, hook := range action.Hooks {
h, err := NewHook(hook, action, s.cfg, s.endpoint, s.serverAddress, s.stats)
for hookIdx, actionHook := range action.Hooks {
h, err := NewHook(actionHook, action, s.cfg, s.endpoint, s.serverAddress, s.stats)
if err != nil {
return nil, err
}
task := &Task{
RunID: runID,
HookRunID: NewHookRunID(actionIdx, hookIdx),
Action: action,
HookID: hook.ID,
HookID: actionHook.ID,
Hook: h,
If: hook.If,
If: actionHook.If,
}
// append new task or chain to the last one based on the current action
actionTasks = append(actionTasks, task)
Expand All @@ -347,12 +346,12 @@ func (s *StoreService) runTasks(ctx context.Context, record graveler.HookRecord,
var actionErr error
for _, task := range actionTasks {
hookOutputWriter := &HookOutputWriter{
Writer: s.Writer,
StorageNamespace: record.StorageNamespace.String(),
RunID: task.RunID,
HookRunID: task.HookRunID,
ActionName: task.Action.Name,
HookID: task.HookID,
Writer: s.Writer,
Repository: record.Repository,
RunID: task.RunID,
HookRunID: task.HookRunID,
ActionName: task.Action.Name,
HookID: task.HookID,
}

// evaluate if expression and keep error for later
Expand Down Expand Up @@ -431,23 +430,24 @@ func (s *StoreService) saveRunInformation(ctx context.Context, record graveler.H

manifest := buildRunManifestFromTasks(record, tasks)

err := s.saveRunManifestDB(ctx, record.RepositoryID, manifest)
repositoryID := record.Repository.RepositoryID
err := s.saveRunManifestDB(ctx, repositoryID, manifest)
if err != nil {
return fmt.Errorf("insert run information: %w", err)
}

return s.saveRunManifestObjectStore(ctx, manifest, record.StorageNamespace.String(), record.RunID)
return s.saveRunManifestObjectStore(ctx, manifest, record.Repository, record.RunID)
}

func (s *StoreService) saveRunManifestObjectStore(ctx context.Context, manifest RunManifest, storageNamespace string, runID string) error {
func (s *StoreService) saveRunManifestObjectStore(ctx context.Context, manifest RunManifest, repository *graveler.RepositoryRecord, runID string) error {
manifestJSON, err := json.Marshal(manifest)
if err != nil {
return fmt.Errorf("marshal run manifest: %w", err)
}
runManifestPath := FormatRunManifestOutputPath(runID)
manifestReader := bytes.NewReader(manifestJSON)
manifestSize := int64(len(manifestJSON))
return s.Writer.OutputWrite(ctx, storageNamespace, runManifestPath, manifestReader, manifestSize)
return s.Writer.OutputWrite(ctx, repository, runManifestPath, manifestReader, manifestSize)
}

func (s *StoreService) saveRunManifestDB(ctx context.Context, repositoryID graveler.RepositoryID, manifest RunManifest) error {
Expand Down Expand Up @@ -495,8 +495,8 @@ func buildRunManifestFromTasks(record graveler.HookRecord, tasks [][]*Task) RunM
}

// UpdateCommitID assume record is a post event, we use the PreRunID to update the commit_id and save the run manifest again
func (s *StoreService) UpdateCommitID(ctx context.Context, repositoryID string, storageNamespace string, runID string, commitID string) error {
manifest, err := s.Store.UpdateCommitID(ctx, repositoryID, runID, commitID)
func (s *StoreService) UpdateCommitID(ctx context.Context, repository *graveler.RepositoryRecord, runID string, commitID string) error {
manifest, err := s.Store.UpdateCommitID(ctx, repository.RepositoryID.String(), runID, commitID)
if err != nil {
return fmt.Errorf("updating commit ID: %w", err)
}
Expand All @@ -505,7 +505,7 @@ func (s *StoreService) UpdateCommitID(ctx context.Context, repositoryID string,
}

// update manifest
return s.saveRunManifestObjectStore(ctx, *manifest, storageNamespace, runID)
return s.saveRunManifestObjectStore(ctx, *manifest, repository, runID)
}

func (s *StoreService) GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error) {
Expand All @@ -530,7 +530,7 @@ func (s *StoreService) PreCommitHook(ctx context.Context, record graveler.HookRe

func (s *StoreService) PostCommitHook(ctx context.Context, record graveler.HookRecord) error {
// update pre-commit with commit ID if needed
err := s.UpdateCommitID(ctx, record.RepositoryID.String(), record.StorageNamespace.String(), record.PreRunID, record.CommitID.String())
err := s.UpdateCommitID(ctx, record.Repository, record.PreRunID, record.CommitID.String())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4e569da

Please sign in to comment.