diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b7be12c61..e7f9f1c11 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -9,11 +9,19 @@ jobs: uses: actions/setup-go@v3 with: go-version: 1.19.x - + + - name: Install Protoc + uses: arduino/setup-protoc@v2 + + - name: Installing protoc-gen-go + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest + - name: Set up Nodejs uses: actions/setup-node@v3 with: - node-version: 16 + node-version: 16 - name: Set up yarn run: npm install --global yarn @@ -42,7 +50,7 @@ jobs: ${{ runner.os }}-go- - name: Build run: | - mkdir ./bin && go build -o ./bin/dagu . + mkdir ./bin && protoc -I=./ --go_out=./internal ./internal/proto/*.proto && go build -o ./bin/dagu . - name: Test run: | go test -v -coverprofile="coverage.txt" -covermode=atomic ./... diff --git a/.gitignore b/.gitignore index 964e7c02e..9b674ab07 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ bin/* dist/ internal/admin/handlers/web/assets/fonts/* internal/admin/handlers/web/assets/js/* +internal/**/*.pb.go # NVM .nvmrc diff --git a/Makefile b/Makefile index d397c63a4..1d7427189 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ .PHONY: build server scheduler test ### Variables ### +SRC_DIR=./ +DST_DIR=$(SRC_DIR)/internal BUILD_VERSION=$(shell date +'%y%m%d%H%M%S') LDFLAGS=-X 'main.version=$(BUILD_VERSION)' @@ -9,6 +11,11 @@ VERSION= DOCKER_CMD := docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 --build-arg VERSION=$(VERSION) --push --no-cache ### Commands ### +gen-pb: + protoc -I=$(SRC_DIR) --go_out=$(DST_DIR) $(SRC_DIR)/internal/proto/*.proto + +build-bin: + go build -ldflags="$(LDFLAGS)" -o ./bin/dagu . server: go build -ldflags="$(LDFLAGS)" -o ./bin/dagu . @@ -21,16 +28,13 @@ scheduler: build-dir build-dir: @mkdir -p ./bin -build: build-admin build-dir build-bin +build: build-admin build-dir gen-pb build-bin build-admin: @cd admin; \ yarn && yarn build @cp admin/dist/bundle.js ./internal/admin/handlers/web/assets/js/ -build-bin: - @go build -ldflags="$(LDFLAGS)" -o ./bin/dagu . - test: @go test -v ./... diff --git a/go.mod b/go.mod index 97ddfc5e9..5f7317b92 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/tools v0.1.12 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gotest.tools/v3 v3.4.0 // indirect ) diff --git a/go.sum b/go.sum index 12706f6cb..699eddc0d 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -543,6 +544,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index ebd812f22..36b362b0f 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -21,6 +21,7 @@ import ( "github.com/yohamta/dagu/internal/logger" "github.com/yohamta/dagu/internal/mailer" "github.com/yohamta/dagu/internal/models" + "github.com/yohamta/dagu/internal/pb" "github.com/yohamta/dagu/internal/reporter" "github.com/yohamta/dagu/internal/scheduler" "github.com/yohamta/dagu/internal/sock" @@ -162,18 +163,37 @@ func (a *Agent) signal(sig os.Signal, allowOverride bool) { func (a *Agent) init() { logDir := path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_")) + config := &scheduler.Config{ + LogDir: logDir, + MaxActiveRuns: a.DAG.MaxActiveRuns, + Delay: a.DAG.Delay, + Dry: a.Dry, + RequestId: a.requestId, + } + + if a.DAG.HandlerOn.Exit != nil { + onExit, _ := pb.ToPbStep(a.DAG.HandlerOn.Exit) + config.OnExit = onExit + } + + if a.DAG.HandlerOn.Success != nil { + onSuccess, _ := pb.ToPbStep(a.DAG.HandlerOn.Success) + config.OnSuccess = onSuccess + } + + if a.DAG.HandlerOn.Failure != nil { + onFailure, _ := pb.ToPbStep(a.DAG.HandlerOn.Failure) + config.OnFailure = onFailure + } + + if a.DAG.HandlerOn.Cancel != nil { + onCancel, _ := pb.ToPbStep(a.DAG.HandlerOn.Cancel) + config.OnCancel = onCancel + } + a.scheduler = &scheduler.Scheduler{ - Config: &scheduler.Config{ - LogDir: logDir, - MaxActiveRuns: a.DAG.MaxActiveRuns, - Delay: a.DAG.Delay, - Dry: a.Dry, - OnExit: a.DAG.HandlerOn.Exit, - OnSuccess: a.DAG.HandlerOn.Success, - OnFailure: a.DAG.HandlerOn.Failure, - OnCancel: a.DAG.HandlerOn.Cancel, - RequestId: a.requestId, - }} + Config: config, + } a.reporter = &reporter.Reporter{ Config: &reporter.Config{ Mailer: &mailer.Mailer{ diff --git a/internal/pb/step_helper.go b/internal/pb/step_helper.go new file mode 100644 index 000000000..0e0b61fd8 --- /dev/null +++ b/internal/pb/step_helper.go @@ -0,0 +1,210 @@ +package pb + +import ( + "github.com/yohamta/dagu/internal/dag" + // "google.golang.org/protobuf/encoding/protojson" + "fmt" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + durationpb "google.golang.org/protobuf/types/known/durationpb" + structpb "google.golang.org/protobuf/types/known/structpb" + wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" +) + +func ToDagStep(pbStep *Step) (*dag.Step, error) { + if pbStep == nil { + return nil, fmt.Errorf("pbStep must not be nil") + } + dagStep := &dag.Step{ + Name: pbStep.Name, + Description: pbStep.Description, + Variables: pbStep.Variables, + Dir: pbStep.Dir, + CmdWithArgs: pbStep.CmdWithArgs, + Command: pbStep.Command, + Script: pbStep.Script, + Stdout: pbStep.Stdout, + Stderr: pbStep.Stderr, + Output: pbStep.Output, + Args: pbStep.Args, + Depends: pbStep.Depends, + MailOnError: pbStep.MailOnError, + SignalOnStop: pbStep.SignalOnStop, + } + + if pbStep.ExecutorConfig != nil { + config := make(map[string]interface{}, len(pbStep.ExecutorConfig.Config)) + for k, v := range pbStep.ExecutorConfig.Config { + vInterface, err := convertPbAnyToInterface(v) + if err != nil { + return nil, err + } + config[k] = vInterface + } + + dagStep.ExecutorConfig = dag.ExecutorConfig{ + Type: pbStep.ExecutorConfig.Type, + Config: config, + } + } + + if pbStep.ContinueOn != nil { + dagStep.ContinueOn = dag.ContinueOn{ + Failure: pbStep.ContinueOn.Failure, + Skipped: pbStep.ContinueOn.Skipped, + } + } + + if pbStep.RetryPolicy != nil { + dagStep.RetryPolicy = &dag.RetryPolicy{ + Limit: int(pbStep.RetryPolicy.Limit), + Interval: pbStep.RetryPolicy.Interval.AsDuration(), + } + } + + if pbStep.RepeatPolicy != nil { + dagStep.RepeatPolicy = dag.RepeatPolicy{ + Repeat: pbStep.RepeatPolicy.Repeat, + Interval: pbStep.RepeatPolicy.Interval.AsDuration(), + } + } + + if pbStep.Preconditions != nil { + conditions := make([]*dag.Condition, len(pbStep.Preconditions)) + for i, c := range pbStep.Preconditions { + conditions[i] = &dag.Condition{ + Condition: c.Condition, + Expected: c.Expected, + } + } + dagStep.Preconditions = conditions + } + + return dagStep, nil +} + +func ToPbStep(dagStep *dag.Step) (*Step, error) { + if dagStep == nil { + return nil, fmt.Errorf("dagStep must not be nil") + } + step := &Step{ + Name: dagStep.Name, + Description: dagStep.Description, + Variables: dagStep.Variables, + Dir: dagStep.Dir, + CmdWithArgs: dagStep.CmdWithArgs, + Command: dagStep.Command, + Script: dagStep.Script, + Stdout: dagStep.Stdout, + Stderr: dagStep.Stderr, + Output: dagStep.Output, + Args: dagStep.Args, + Depends: dagStep.Depends, + MailOnError: dagStep.MailOnError, + SignalOnStop: dagStep.SignalOnStop, + } + + if &dagStep.ExecutorConfig != nil { + config := make(map[string]*anypb.Any, len(dagStep.ExecutorConfig.Config)) + for k, v := range dagStep.ExecutorConfig.Config { + pMsg, err := convertToProtoMessage(v) + if err != nil { + return nil, err + } + + any, err := anypb.New(pMsg) + if err != nil { + return nil, err + } + + config[k] = any + } + step.ExecutorConfig = &ExecutorConfig{ + Type: dagStep.ExecutorConfig.Type, + Config: config, + } + } + + if &dagStep.ContinueOn != nil { + step.ContinueOn = &ContinueOn{ + Failure: dagStep.ContinueOn.Failure, + Skipped: dagStep.ContinueOn.Skipped, + } + } + + if dagStep.RetryPolicy != nil { + step.RetryPolicy = &RetryPolicy{ + Limit: int32(dagStep.RetryPolicy.Limit), + Interval: durationpb.New(dagStep.RetryPolicy.Interval), + } + } + + if &dagStep.RepeatPolicy != nil { + step.RepeatPolicy = &RepeatPolicy{ + Repeat: dagStep.RepeatPolicy.Repeat, + Interval: durationpb.New(dagStep.RepeatPolicy.Interval), + } + } + + if dagStep.Preconditions != nil { + conditions := make([]*Condition, len(dagStep.Preconditions)) + for i, c := range dagStep.Preconditions { + conditions[i] = &Condition{ + Condition: c.Condition, + Expected: c.Expected, + } + } + step.Preconditions = conditions + } + + return step, nil +} + +func convertPbAnyToInterface(any *anypb.Any) (interface{}, error) { + switch any.TypeUrl { + case "type.googleapis.com/google.protobuf.IntValue": + var intValue wrapperspb.Int32Value + if err := any.UnmarshalTo(&intValue); err != nil { + return nil, fmt.Errorf("could not unmarshal IntValue: %w", err) + } + return intValue.GetValue(), nil + case "type.googleapis.com/google.protobuf.StringValue": + var stringValue wrapperspb.StringValue + if err := any.UnmarshalTo(&stringValue); err != nil { + return nil, fmt.Errorf("could not unmarshal StringValue: %w", err) + } + return stringValue.GetValue(), nil + case "type.googleapis.com/google.protobuf.BoolValue": + var boolValue wrapperspb.BoolValue + if err := any.UnmarshalTo(&boolValue); err != nil { + return nil, fmt.Errorf("could not unmarshal BoolValue: %w", err) + } + return boolValue.GetValue(), nil + case "type.googleapis.com/google.protobuf.Struct": + var structValue structpb.Struct + if err := any.UnmarshalTo(&structValue); err != nil { + return nil, fmt.Errorf("could not unmarshal Struct: %w", err) + } + return structValue.AsMap(), nil + default: + return nil, fmt.Errorf("unknown type URL: %s", any.TypeUrl) + } +} + +func convertToProtoMessage(v interface{}) (proto.Message, error) { + switch value := v.(type) { + case string: + return wrapperspb.String(value), nil + case int: + return wrapperspb.Int32(int32(value)), nil + case int32: + return wrapperspb.Int32(value), nil + case bool: + return wrapperspb.Bool(value), nil + case map[string]interface{}: + return structpb.NewStruct(value) + default: + return nil, fmt.Errorf("unsupported type: %T", v) + } +} diff --git a/internal/proto/step.proto b/internal/proto/step.proto new file mode 100644 index 000000000..5e391b7ea --- /dev/null +++ b/internal/proto/step.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; +package models; + +import "google/protobuf/any.proto"; +import "google/protobuf/duration.proto"; +// import "google/protobuf/timestamp.proto"; + +option go_package = "./pb"; + +message Step { + string name = 1; + string description = 2; + repeated string variables = 3; + string dir = 4; + ExecutorConfig executor_config = 5; + string cmd_with_args = 6; + string command = 7; + string script = 8; + string stdout = 9; + string stderr = 10; + string output = 11; + repeated string args = 12; + repeated string depends = 13; + ContinueOn continue_on = 14; + RetryPolicy retry_policy = 15; + RepeatPolicy repeat_policy = 16; + bool mail_on_error = 17; + repeated Condition preconditions = 18; + string signal_on_stop = 19; +} + +message ExecutorConfig { + string type = 1; + map config = 2; +} + +message ContinueOn { + bool failure = 1; + bool skipped = 2; +} + +message RetryPolicy { + int32 limit = 1; + google.protobuf.Duration interval = 2; +} + +message RepeatPolicy { + bool repeat = 1; + google.protobuf.Duration interval = 2; +} + +message Condition { + string condition = 1; + string expected = 2; +} \ No newline at end of file diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 7013e4347..26aef76a5 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -11,6 +11,7 @@ import ( "github.com/yohamta/dagu/internal/config" "github.com/yohamta/dagu/internal/constants" "github.com/yohamta/dagu/internal/dag" + "github.com/yohamta/dagu/internal/pb" ) type SchedulerStatus int @@ -57,10 +58,10 @@ type Config struct { MaxActiveRuns int Delay time.Duration Dry bool - OnExit *dag.Step - OnSuccess *dag.Step - OnFailure *dag.Step - OnCancel *dag.Step + OnExit *pb.Step + OnSuccess *pb.Step + OnFailure *pb.Step + OnCancel *pb.Step RequestId string } @@ -349,16 +350,20 @@ func (sc *Scheduler) setup() (err error) { } sc.handlers = map[string]*Node{} if sc.OnExit != nil { - sc.handlers[constants.OnExit] = &Node{Step: sc.OnExit} + onExit, _ := pb.ToDagStep(sc.OnExit) + sc.handlers[constants.OnExit] = &Node{Step: onExit} } if sc.OnSuccess != nil { - sc.handlers[constants.OnSuccess] = &Node{Step: sc.OnSuccess} + onSuccess, _ := pb.ToDagStep(sc.OnSuccess) + sc.handlers[constants.OnSuccess] = &Node{Step: onSuccess} } if sc.OnFailure != nil { - sc.handlers[constants.OnFailure] = &Node{Step: sc.OnFailure} + onFailure, _ := pb.ToDagStep(sc.OnFailure) + sc.handlers[constants.OnFailure] = &Node{Step: onFailure} } if sc.OnCancel != nil { - sc.handlers[constants.OnCancel] = &Node{Step: sc.OnCancel} + onCancel, _ := pb.ToDagStep(sc.OnCancel) + sc.handlers[constants.OnCancel] = &Node{Step: onCancel} } return } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index db3875edd..bbbb0391d 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -12,6 +12,7 @@ import ( "github.com/yohamta/dagu/internal/config" "github.com/yohamta/dagu/internal/constants" "github.com/yohamta/dagu/internal/dag" + "github.com/yohamta/dagu/internal/pb" "github.com/yohamta/dagu/internal/utils" ) @@ -310,9 +311,10 @@ func TestStepPreCondition(t *testing.T) { } func TestSchedulerOnExit(t *testing.T) { + pbOnExit, _ := pb.ToPbStep(step("onExit", testCommand)) g, sc := newTestSchedule(t, &Config{ - OnExit: step("onExit", testCommand), + OnExit: pbOnExit, }, step("1", testCommand), step("2", testCommand, "1"), @@ -333,9 +335,10 @@ func TestSchedulerOnExit(t *testing.T) { } func TestSchedulerOnExitOnFail(t *testing.T) { + pbOnExit, _ := pb.ToPbStep(step("onExit", testCommand)) g, sc := newTestSchedule(t, &Config{ - OnExit: step("onExit", testCommand), + OnExit: pbOnExit, }, step("1", testCommandFail), step("2", testCommand, "1"), @@ -378,11 +381,14 @@ func TestSchedulerOnSignal(t *testing.T) { } func TestSchedulerOnCancel(t *testing.T) { + pbOnSuccess, _ := pb.ToPbStep(step("onSuccess", testCommand)) + pbOnFailure, _ := pb.ToPbStep(step("onFailure", testCommand)) + pbOnCancel, _ := pb.ToPbStep(step("onCancel", testCommand)) g, sc := newTestSchedule(t, &Config{ - OnSuccess: step("onSuccess", testCommand), - OnFailure: step("onFailure", testCommand), - OnCancel: step("onCancel", testCommand), + OnSuccess: pbOnSuccess, + OnFailure: pbOnFailure, + OnCancel: pbOnCancel, }, step("1", testCommand), step("2", "sleep 60", "1"), @@ -408,11 +414,14 @@ func TestSchedulerOnCancel(t *testing.T) { } func TestSchedulerOnSuccess(t *testing.T) { + pbOnExit, _ := pb.ToPbStep(step("onExit", testCommand)) + pbOnSuccess, _ := pb.ToPbStep(step("onSuccess", testCommand)) + pbOnFailure, _ := pb.ToPbStep(step("onFailure", testCommand)) g, sc := newTestSchedule(t, &Config{ - OnExit: step("onExit", testCommand), - OnSuccess: step("onSuccess", testCommand), - OnFailure: step("onFailure", testCommand), + OnExit: pbOnExit, + OnSuccess: pbOnSuccess, + OnFailure: pbOnFailure, }, step("1", testCommand), ) @@ -428,12 +437,16 @@ func TestSchedulerOnSuccess(t *testing.T) { } func TestSchedulerOnFailure(t *testing.T) { + pbOnExit, _ := pb.ToPbStep(step("onExit", testCommand)) + pbOnSuccess, _ := pb.ToPbStep(step("onSuccess", testCommand)) + pbOnFailure, _ := pb.ToPbStep(step("onFailure", testCommand)) + pbOnCancel, _ := pb.ToPbStep(step("onCancel", testCommand)) g, sc := newTestSchedule(t, &Config{ - OnExit: step("onExit", testCommand), - OnSuccess: step("onSuccess", testCommand), - OnFailure: step("onFailure", testCommand), - OnCancel: step("onCancel", testCommand), + OnExit: pbOnExit, + OnSuccess: pbOnSuccess, + OnFailure: pbOnFailure, + OnCancel: pbOnCancel, }, step("1", testCommandFail), )