Skip to content

Commit

Permalink
in flow/tasks/*: add locks around CUE evaluator until concurrency safe
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Worm <tony@hofstadter.io>
  • Loading branch information
verdverm committed Feb 2, 2022
1 parent 29241a4 commit e247dc0
Show file tree
Hide file tree
Showing 25 changed files with 614 additions and 360 deletions.
2 changes: 1 addition & 1 deletion cmd/cuetils/cmd/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ func init() {
PipelineCmd.SetHelpFunc(help)
PipelineCmd.SetUsageFunc(usage)

}
}
12 changes: 8 additions & 4 deletions examples/pipeline/conditional_001.cue
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ vars: {
which: string @tag(which)
}

secrets: {
user: "foobar" @secret()
}

apicall: {
@pipeline(apicall)
In: string
Expand All @@ -18,10 +22,10 @@ main: {

start: { text: "apicalling" } @task(os.Stdout)

call: apicall & { In: "req.json" } @dummy(call1)
call: apicall & {
In: "req.json"
key: "shhhh" @secret()
}
final: { text: call.r1.resp } @task(os.Stdout,final1)

call2: apicall & { In: "req2.json" } @dummy(call2)
final2: { text: call2.Resp } @task(os.Stdout,final2)

}
54 changes: 34 additions & 20 deletions ideas.md
Original file line number Diff line number Diff line change
@@ -1,46 +1,60 @@
# Todo

- [ ] merge and release

Enhance & expand

- [ ] list tags / secrets
- [ ] obfescate secrets, centralized printing (ensure that is the case in every task / run)
- [ ] locks for tasks
- [x] list Tags
- [x] list secrets
- [x] locks for tasks
- [ ] more task types
- [ ] file append
- [ ] mkdir
- [ ] chan / mailbox
- [ ] memory store / load
- [ ] prevent exit when error in handler pipelines
- [ ] prevent exit when error in handler pipelines?
- [ ] rename pipeline -> run
- [ ] merge and release
- [ ] hof/flow cmd

### Docs...

Rename:
- rename pipeline -> run
- hof/flow cmd
probably hof/docs

Docs...

---
### Other task types:

- async
- [ ] chan / mailbox

Build other things cuetils/run
- msg
- rabbitmq
- kafka
- nats
- k/v
- redis
- mongo
- s3/gcs
- vault
- command line prompt

### Build other things cuetils/run

- save all IRC messages to DB
- bookmarks and hn upvotes
- change my lights
- replace helm (need native topo sort)
- OAuth workflow

---
### More todo, always...

Bookkeeping:
- debug flag to print tasks which run
- stats for tasks & pipelines, chan to central
i/o centralization

- [ ] debug/verbose flag to print tasks which run
- [ ] stats for tasks & pipelines, chan to central
- [ ] obfescate secrets, centralized printing (ensure that is the case in every task / run)

Exec & Serve

- some way to run in background, and then kill / exit later?
- write to file for stdio
- [ ] some way to run in background, and then kill / exit later?
- [ ] write directly to file for stdio, is it a concrete string?

---

Expand Down Expand Up @@ -82,7 +96,7 @@ Go funcs:
CLI:

- Support expression on globs, to select out a field on each file

- move implementation?

### Memory issues

Expand Down
9 changes: 9 additions & 0 deletions pipeline/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ type Context struct {
Value cue.Value
Error error

// Global lock around CUE evaluator
CUELock *sync.Mutex

// map of cue.Values
Memory sync.Map

// map of chan?
Mailbox sync.Map

// channels for
// - stats & progress
}
Expand Down
21 changes: 15 additions & 6 deletions pipeline/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
go_ctx "context"
"fmt"
"os"
"sync"

// "time"

Expand Down Expand Up @@ -50,10 +51,10 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.PipelineFlagpol
// (refactor/pipe/solo)
val := in.Value

// taskCtx, err := buildTaskContext(sharedContex, val, opts, popts)

// (temp), give each own context (created in here), but like ^^^
taskCtx, err := buildTaskContext(val, opts, popts)
// (temp), give each own context (created in here), or maybe by flag? Need at least the shared mutex
taskCtx, err := buildRootContext(val, opts, popts)
// taskCtx, err := buildRootContext(sharedContex, val, opts, popts)
if err != nil {
return nil, err
}
Expand All @@ -66,7 +67,7 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.PipelineFlagpol

// lets just print
if popts.List {
tags, errs := getTags(val)
tags, secrets, errs := getTagsAndSecrets(val)
if len(errs) > 0 {
return nil, fmt.Errorf("in getTags: %v", errs)
}
Expand All @@ -78,7 +79,14 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.PipelineFlagpol
}
fmt.Println()
}

if len(secrets) > 0 {
fmt.Println("secrets:\n==============")
for _, v := range secrets {
path := v.Path()
fmt.Printf("%s: %# v %v\n", path, v, v.Attribute("secret"))
}
fmt.Println()
}

fmt.Println("flows:\n==============")
err = listPipelines(val, opts, popts)
Expand Down Expand Up @@ -128,14 +136,15 @@ var walkOptions = []cue.Option{
cue.Docs(true),
}

func buildTaskContext(val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) (*context.Context, error) {
func buildRootContext(val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) (*context.Context, error) {
// lookup the secret label in val
// and build a filter write for stdout / stderr
c := &context.Context{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
Context: go_ctx.Background(),
CUELock: new(sync.Mutex),
}
return c, nil
}
10 changes: 5 additions & 5 deletions pipeline/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/hofstadter-io/cuetils/structural"
)

func getTags(val cue.Value) (tags []cue.Value, errs []error) {
func getTagsAndSecrets(val cue.Value) (tags []cue.Value, secrets []cue.Value, errs []error) {

// fuction used during tree walk to collect values with tags
collector := func (v cue.Value) bool {
Expand All @@ -21,20 +21,20 @@ func getTags(val cue.Value) (tags []cue.Value, errs []error) {
if attr.NumArgs() == 0 {
err = fmt.Errorf("@tag() has no inner args at %s", v.Path())
errs = append(errs, err)
return false
}
tags = append(tags, v)
return false
}
if attr.Name() == "secret" {
secrets = append(secrets, v)
}
}

return true
}

structural.Walk(val, collector, nil, walkOptions...)


return tags, errs
return tags, secrets, errs
}

func injectTags(val cue.Value, tags []string) (cue.Value, error) {
Expand Down
1 change: 1 addition & 0 deletions pipeline/tasker/tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func maybeTask(ctx *context.Context, val cue.Value, attr cue.Attribute) (flow.Ru
// wrap our RunnerFunc with cue/flow RunnerFunc
return flow.RunnerFunc(func(t *flow.Task) error {
c := &context.Context{
CUELock: ctx.CUELock,
Context: t.Context(),
Value: t.Value(),
Stdin: ctx.Stdin,
Expand Down
61 changes: 42 additions & 19 deletions pipeline/tasks/api/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,43 +29,66 @@ func NewCall(val cue.Value) (context.Runner, error) {
func (T *Call) Run(ctx *context.Context) (interface{}, error) {
val := ctx.Value

req := val.LookupPath(cue.ParsePath("req"))
var R *gorequest.SuperAgent
var err error
var res interface{}

R, err := buildRequest(req)
func () error {
ctx.CUELock.Lock()
defer func() {
ctx.CUELock.Unlock()
}()

req := val.LookupPath(cue.ParsePath("req"))

R, err = buildRequest(req)
if err != nil {
return err
}
return nil
}()
if err != nil {
return nil, err
}


actual, err := makeRequest(R)
if err != nil {
return nil, err
}

// TODO, build resp cue.Value from http.Response

body, err := io.ReadAll(actual.Body)
if err != nil {
return nil, err
}

var isString bool
r := val.LookupPath(cue.ParsePath("resp"))
if r.Exists() && r.IncompleteKind() == cue.StringKind {
isString = true
}
func () {
ctx.CUELock.Lock()
defer func() {
ctx.CUELock.Unlock()
}()

// TODO, make response object more interesting
// such as status, headers, body vs json
var resp interface{}
if isString {
resp = string(body)
} else {
resp = val.Context().CompileBytes(body, cue.Filename("resp"))
}
// TODO, build resp cue.Value from http.Response

var isString bool
r := val.LookupPath(cue.ParsePath("resp"))
if r.Exists() && r.IncompleteKind() == cue.StringKind {
isString = true
}

// TODO, make response object more interesting
// such as status, headers, body vs json
var resp interface{}
if isString {
resp = string(body)
} else {
resp = val.Context().CompileBytes(body, cue.Filename("resp"))
}


// Use fill to "return" a result to the workflow engine
res := val.FillPath(cue.ParsePath("resp"), resp)
// Use fill to "return" a result to the workflow engine
res = val.FillPath(cue.ParsePath("resp"), resp)
}()

// fmt.Println("end: API call")
return res, nil
Expand Down
Loading

0 comments on commit e247dc0

Please sign in to comment.