Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert dag.Step struct to protocol buffers #449

Merged
merged 17 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: 1.19.x


- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

- name: Set up Nodejs
uses: actions/setup-node@v3
with:
node-version: 16
node-version: 16

- name: Set up yarn
run: npm install --global yarn
Expand Down Expand Up @@ -42,7 +50,7 @@ jobs:
${{ runner.os }}-go-
- name: Build
run: |
mkdir ./bin && go build -o ./bin/dagu .
mkdir ./bin && protoc -I=./ --go_out=./internal ./internal/proto/*.proto && go build -o ./bin/dagu .
- name: Test
run: |
go test -v -coverprofile="coverage.txt" -covermode=atomic ./...
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bin/*
dist/
internal/admin/handlers/web/assets/fonts/*
internal/admin/handlers/web/assets/js/*
internal/**/*.pb.go

# NVM
.nvmrc
Expand Down
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
.PHONY: build server scheduler test

### Variables ###
SRC_DIR=./
DST_DIR=$(SRC_DIR)/internal
BUILD_VERSION=$(shell date +'%y%m%d%H%M%S')
LDFLAGS=-X 'main.version=$(BUILD_VERSION)'

Expand All @@ -9,6 +11,11 @@ VERSION=
DOCKER_CMD := docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 --build-arg VERSION=$(VERSION) --push --no-cache

### Commands ###
gen-pb:
protoc -I=$(SRC_DIR) --go_out=$(DST_DIR) $(SRC_DIR)/internal/proto/*.proto

build-bin:
go build -ldflags="$(LDFLAGS)" -o ./bin/dagu .

server:
go build -ldflags="$(LDFLAGS)" -o ./bin/dagu .
Expand All @@ -21,16 +28,13 @@ scheduler: build-dir
build-dir:
@mkdir -p ./bin

build: build-admin build-dir build-bin
build: build-admin build-dir gen-pb build-bin

build-admin:
@cd admin; \
yarn && yarn build
@cp admin/dist/bundle.js ./internal/admin/handlers/web/assets/js/

build-bin:
@go build -ldflags="$(LDFLAGS)" -o ./bin/dagu .

test:
@go test -v ./...

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gotest.tools/v3 v3.4.0 // indirect
)
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -543,6 +544,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
Expand Down
42 changes: 31 additions & 11 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/yohamta/dagu/internal/logger"
"github.com/yohamta/dagu/internal/mailer"
"github.com/yohamta/dagu/internal/models"
"github.com/yohamta/dagu/internal/pb"
"github.com/yohamta/dagu/internal/reporter"
"github.com/yohamta/dagu/internal/scheduler"
"github.com/yohamta/dagu/internal/sock"
Expand Down Expand Up @@ -162,18 +163,37 @@ func (a *Agent) signal(sig os.Signal, allowOverride bool) {

func (a *Agent) init() {
logDir := path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_"))
config := &scheduler.Config{
LogDir: logDir,
MaxActiveRuns: a.DAG.MaxActiveRuns,
Delay: a.DAG.Delay,
Dry: a.Dry,
RequestId: a.requestId,
}

if a.DAG.HandlerOn.Exit != nil {
onExit, _ := pb.ToPbStep(a.DAG.HandlerOn.Exit)
config.OnExit = onExit
}

if a.DAG.HandlerOn.Success != nil {
onSuccess, _ := pb.ToPbStep(a.DAG.HandlerOn.Success)
config.OnSuccess = onSuccess
}

if a.DAG.HandlerOn.Failure != nil {
onFailure, _ := pb.ToPbStep(a.DAG.HandlerOn.Failure)
config.OnFailure = onFailure
}

if a.DAG.HandlerOn.Cancel != nil {
onCancel, _ := pb.ToPbStep(a.DAG.HandlerOn.Cancel)
config.OnCancel = onCancel
}

a.scheduler = &scheduler.Scheduler{
Config: &scheduler.Config{
LogDir: logDir,
MaxActiveRuns: a.DAG.MaxActiveRuns,
Delay: a.DAG.Delay,
Dry: a.Dry,
OnExit: a.DAG.HandlerOn.Exit,
OnSuccess: a.DAG.HandlerOn.Success,
OnFailure: a.DAG.HandlerOn.Failure,
OnCancel: a.DAG.HandlerOn.Cancel,
RequestId: a.requestId,
}}
Config: config,
}
a.reporter = &reporter.Reporter{
Config: &reporter.Config{
Mailer: &mailer.Mailer{
Expand Down
210 changes: 210 additions & 0 deletions internal/pb/step_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package pb

import (
"github.com/yohamta/dagu/internal/dag"
// "google.golang.org/protobuf/encoding/protojson"
"fmt"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
durationpb "google.golang.org/protobuf/types/known/durationpb"
structpb "google.golang.org/protobuf/types/known/structpb"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
)

func ToDagStep(pbStep *Step) (*dag.Step, error) {
if pbStep == nil {
return nil, fmt.Errorf("pbStep must not be nil")
}
dagStep := &dag.Step{
Name: pbStep.Name,
Description: pbStep.Description,
Variables: pbStep.Variables,
Dir: pbStep.Dir,
CmdWithArgs: pbStep.CmdWithArgs,
Command: pbStep.Command,
Script: pbStep.Script,
Stdout: pbStep.Stdout,
Stderr: pbStep.Stderr,
Output: pbStep.Output,
Args: pbStep.Args,
Depends: pbStep.Depends,
MailOnError: pbStep.MailOnError,
SignalOnStop: pbStep.SignalOnStop,
}

if pbStep.ExecutorConfig != nil {
config := make(map[string]interface{}, len(pbStep.ExecutorConfig.Config))
for k, v := range pbStep.ExecutorConfig.Config {
vInterface, err := convertPbAnyToInterface(v)
if err != nil {
return nil, err
}
config[k] = vInterface
}

dagStep.ExecutorConfig = dag.ExecutorConfig{
Type: pbStep.ExecutorConfig.Type,
Config: config,
}
}

if pbStep.ContinueOn != nil {
dagStep.ContinueOn = dag.ContinueOn{
Failure: pbStep.ContinueOn.Failure,
Skipped: pbStep.ContinueOn.Skipped,
}
}

if pbStep.RetryPolicy != nil {
dagStep.RetryPolicy = &dag.RetryPolicy{
Limit: int(pbStep.RetryPolicy.Limit),
Interval: pbStep.RetryPolicy.Interval.AsDuration(),
}
}

if pbStep.RepeatPolicy != nil {
dagStep.RepeatPolicy = dag.RepeatPolicy{
Repeat: pbStep.RepeatPolicy.Repeat,
Interval: pbStep.RepeatPolicy.Interval.AsDuration(),
}
}

if pbStep.Preconditions != nil {
conditions := make([]*dag.Condition, len(pbStep.Preconditions))
for i, c := range pbStep.Preconditions {
conditions[i] = &dag.Condition{
Condition: c.Condition,
Expected: c.Expected,
}
}
dagStep.Preconditions = conditions
}

return dagStep, nil
}

func ToPbStep(dagStep *dag.Step) (*Step, error) {
if dagStep == nil {
return nil, fmt.Errorf("dagStep must not be nil")
}
step := &Step{
Name: dagStep.Name,
Description: dagStep.Description,
Variables: dagStep.Variables,
Dir: dagStep.Dir,
CmdWithArgs: dagStep.CmdWithArgs,
Command: dagStep.Command,
Script: dagStep.Script,
Stdout: dagStep.Stdout,
Stderr: dagStep.Stderr,
Output: dagStep.Output,
Args: dagStep.Args,
Depends: dagStep.Depends,
MailOnError: dagStep.MailOnError,
SignalOnStop: dagStep.SignalOnStop,
}

if &dagStep.ExecutorConfig != nil {
config := make(map[string]*anypb.Any, len(dagStep.ExecutorConfig.Config))
for k, v := range dagStep.ExecutorConfig.Config {
pMsg, err := convertToProtoMessage(v)
if err != nil {
return nil, err
}

any, err := anypb.New(pMsg)
if err != nil {
return nil, err
}

config[k] = any
}
step.ExecutorConfig = &ExecutorConfig{
Type: dagStep.ExecutorConfig.Type,
Config: config,
}
}

if &dagStep.ContinueOn != nil {
step.ContinueOn = &ContinueOn{
Failure: dagStep.ContinueOn.Failure,
Skipped: dagStep.ContinueOn.Skipped,
}
}

if dagStep.RetryPolicy != nil {
step.RetryPolicy = &RetryPolicy{
Limit: int32(dagStep.RetryPolicy.Limit),
Interval: durationpb.New(dagStep.RetryPolicy.Interval),
}
}

if &dagStep.RepeatPolicy != nil {
step.RepeatPolicy = &RepeatPolicy{
Repeat: dagStep.RepeatPolicy.Repeat,
Interval: durationpb.New(dagStep.RepeatPolicy.Interval),
}
}

if dagStep.Preconditions != nil {
conditions := make([]*Condition, len(dagStep.Preconditions))
for i, c := range dagStep.Preconditions {
conditions[i] = &Condition{
Condition: c.Condition,
Expected: c.Expected,
}
}
step.Preconditions = conditions
}

return step, nil
}

func convertPbAnyToInterface(any *anypb.Any) (interface{}, error) {
switch any.TypeUrl {
case "type.googleapis.com/google.protobuf.IntValue":
var intValue wrapperspb.Int32Value
if err := any.UnmarshalTo(&intValue); err != nil {
return nil, fmt.Errorf("could not unmarshal IntValue: %w", err)
}
return intValue.GetValue(), nil
case "type.googleapis.com/google.protobuf.StringValue":
var stringValue wrapperspb.StringValue
if err := any.UnmarshalTo(&stringValue); err != nil {
return nil, fmt.Errorf("could not unmarshal StringValue: %w", err)
}
return stringValue.GetValue(), nil
case "type.googleapis.com/google.protobuf.BoolValue":
var boolValue wrapperspb.BoolValue
if err := any.UnmarshalTo(&boolValue); err != nil {
return nil, fmt.Errorf("could not unmarshal BoolValue: %w", err)
}
return boolValue.GetValue(), nil
case "type.googleapis.com/google.protobuf.Struct":
var structValue structpb.Struct
if err := any.UnmarshalTo(&structValue); err != nil {
return nil, fmt.Errorf("could not unmarshal Struct: %w", err)
}
return structValue.AsMap(), nil
default:
return nil, fmt.Errorf("unknown type URL: %s", any.TypeUrl)
}
}

func convertToProtoMessage(v interface{}) (proto.Message, error) {
switch value := v.(type) {
case string:
return wrapperspb.String(value), nil
case int:
return wrapperspb.Int32(int32(value)), nil
case int32:
return wrapperspb.Int32(value), nil
case bool:
return wrapperspb.Bool(value), nil
case map[string]interface{}:
return structpb.NewStruct(value)
default:
return nil, fmt.Errorf("unsupported type: %T", v)
}
}
Loading