Skip to content

Commit

Permalink
implement several pipeline/os tasks; general cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
verdverm committed Jan 19, 2022
1 parent 0b3a06e commit 6abd2c7
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 34 deletions.
10 changes: 9 additions & 1 deletion pipeline/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cuelang.org/go/tools/flow"

"github.com/hofstadter-io/cuetils/pipeline/tasks/api"
"github.com/hofstadter-io/cuetils/pipeline/tasks/os"
"github.com/hofstadter-io/cuetils/pipeline/tasks/st"
)

Expand Down Expand Up @@ -68,11 +69,18 @@ func maybeTask(val cue.Value, attr cue.Attribute) (flow.Runner, error) {
case "st/upsert":
return &st.UpsertTask{}, nil

case "os/readfile":
return &os.ReadFileTask{}, nil
case "os/stdin":
return &os.StdinTask{}, nil
case "os/stdout":
return &os.StdoutTask{}, nil

case "api/call":
return &api.CallTask{}, nil

default:
fmt.Println("unknown attribute:", attr)
fmt.Println("unknown task:", attr)
}

return nil, nil
Expand Down
4 changes: 2 additions & 2 deletions pipeline/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func do(in *structural.Input, opts *flags.RootPflagpole) error {
Root: cue.ParsePath("tasks"),
}

fmt.Println("Dagging...")
// fmt.Println("Dagging...")

// create the workflow which will build the task graph
workflow := flow.New(cfg, value, TaskFactory)

fmt.Println("Running...")
// fmt.Println("Running...")

// run our custom workflow
err = workflow.Run(context.Background())
Expand Down
31 changes: 1 addition & 30 deletions pipeline/tasks/api/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type CallTask struct {
Req cue.Value `cue: "#Req"`
Req cue.Value
Ret cue.Value
}

Expand All @@ -25,22 +25,10 @@ func (T *CallTask) Run(t *flow.Task, err error) error {
fmt.Println("Dep error", err)
}

// not sure this is OK, but the value which was used for this task
val := t.Value()

//s, err := utils.FormatCue(val)
//if err != nil {
//fmt.Println("Fmt error", err)
//}
// fmt.Printf("CallTask: %v\n", s)

req := val.LookupPath(cue.ParsePath("#Req"))

// fmt.Printf("req: %v\n", req)

/*****/
// expected := val.LookupPath(cue.ParsePath("resp"))

R, err := buildRequest(req)
if err != nil {
return err
Expand All @@ -56,26 +44,9 @@ func (T *CallTask) Run(t *flow.Task, err error) error {
return err
}

// fmt.Println("body:", body)

// name better based on path in CUE code
resp := val.Context().CompileBytes(body, cue.Filename("resp"))

// resp := actual

//fail := val.LookupPath(cue.ParsePath("fail"))
//failVal, err := fail.Bool()
//if err != nil {
//// likely not found
//failVal = false
//}

//err = checkResponse(T, verbose, actual, expected, failVal)

/*****/

// fmt.Printf("resp: %v\n", resp)

// Use fill to "return" a result to the workflow engine
res := val.FillPath(cue.ParsePath("Resp"), resp)

Expand Down
71 changes: 71 additions & 0 deletions pipeline/tasks/os/readfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package os

import (
"fmt"
g_os "os"

"cuelang.org/go/cue"
"cuelang.org/go/tools/flow"

"github.com/hofstadter-io/cuetils/utils"
)

type ReadFileTask struct {
// might need F to be an object the helps us to
// understand how to load the contents back in
// such as a string, bytes, or a cue struct
F cue.Value // the filename as a cue value
C cue.Value // the file contents
}

func (T* ReadFileTask) Run(t *flow.Task, err error) error {

if err != nil {
fmt.Println("Dep error", err)
}

v := t.Value()

f := v.LookupPath(cue.ParsePath("#F"))

fn, err := f.String()
if err != nil {
return err
}

bs, err := g_os.ReadFile(fn)
if err != nil {
return err
}

// switch on c's type to fill appropriately
c := v.LookupPath(cue.ParsePath("Contents"))

var res cue.Value
switch k := c.IncompleteKind(); k {
case cue.StringKind:
res = v.FillPath(cue.ParsePath("Contents"), string(bs))
case cue.BytesKind:
res = v.FillPath(cue.ParsePath("Contents"), bs)

case cue.StructKind:
ctx := v.Context()
c := ctx.CompileBytes(bs)
if c.Err() != nil {
return c.Err()
}
res = v.FillPath(cue.ParsePath("Contents"), c)

default:
return fmt.Errorf("Unsupported Content type in ReadFile task: %q", k)
}


// Use fill to "return" a result to the workflow engine
t.Fill(res)

attr := v.Attribute("print")
err = utils.PrintAttr(attr, res)

return err
}
46 changes: 46 additions & 0 deletions pipeline/tasks/os/stdin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package os

import (
"bufio"
"fmt"
g_os "os"

"cuelang.org/go/cue"
"cuelang.org/go/tools/flow"

"github.com/hofstadter-io/cuetils/utils"
)

type StdinTask struct {}

func (T* StdinTask) Run(t *flow.Task, err error) error {

if err != nil {
fmt.Println("Dep error", err)
}

v := t.Value()

msg := v.LookupPath(cue.ParsePath("#Msg"))
if msg.Err() != nil {
return err
} else if msg.Exists() {
m, err := msg.String()
if err != nil {
return err
}
fmt.Print(m)
}

reader := bufio.NewReader(g_os.Stdin)
text, _ := reader.ReadString('\n')

res := v.FillPath(cue.ParsePath("Contents"), text)
// Use fill to "return" a result to the workflow engine
t.Fill(res)

attr := v.Attribute("print")
err = utils.PrintAttr(attr, v)

return err
}
30 changes: 30 additions & 0 deletions pipeline/tasks/os/stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package os

import (
"fmt"

"cuelang.org/go/cue"
"cuelang.org/go/tools/flow"

"github.com/hofstadter-io/cuetils/utils"
)

type StdoutTask struct {}

func (T* StdoutTask) Run(t *flow.Task, err error) error {

if err != nil {
fmt.Println("Dep error", err)
}

v := t.Value()

o := v.LookupPath(cue.ParsePath("#O"))

fmt.Println(o)

attr := v.Attribute("print")
err = utils.PrintAttr(attr, v)

return err
}
4 changes: 4 additions & 0 deletions test/pipeline/readfile_001.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
tasks: {
@pipeline(readfile)
r: { #F: "readfile_001.txt", Contents: string } @task(os/readfile) @print(Contents)
}
23 changes: 23 additions & 0 deletions test/pipeline/readfile_001.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
exec cuetils pipeline in.cue
# cmp stdout golden.stdout

-- pipe.cue --
tasks: {
@pipeline(readfile)
r1: { #F: "readfile_001.txt", Resp: _ } @task(os/readfile) @print("#Req",Resp)
p1: { #X: r1.Resp, #P: pick } @task(st/pick) @print(Out)
}

-- in.json --
"x": {
"a": {
"b": "B"
},
"b": 1
"c": 2
"d": "D"
}

-- golden.stdout --
t.b.d.

10 changes: 10 additions & 0 deletions test/pipeline/readfile_002.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import "encoding/json"

tasks: {
@pipeline(readfile)
r: { #F: "../tree.json", Contents: string } @task(os/readfile)
j: json.Unmarshal(r.Contents)
p: { #X: j, #P: { tree: cow: _ } } @task(st/pick)

final: { #O: p.Out.tree } @task(os/stdout)
}
7 changes: 7 additions & 0 deletions test/pipeline/stdin_001.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import "strings"

tasks: {
@pipeline(stdin)
input: { #Msg: "enter text: " } @task(os/stdin)
final: { #O: strings.ToUpper(input.Contents) } @task(os/stdout)
}
1 change: 0 additions & 1 deletion utils/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
func PrintAttr(attr cue.Attribute, val cue.Value) error {
// maybe print
if attr.Err() == nil {
fmt.Println("PrintAttr", attr)
for i := 0; i < attr.NumArgs(); i++ {
a, _ := attr.String(i)
v := val.LookupPath(cue.ParsePath(a))
Expand Down

0 comments on commit 6abd2c7

Please sign in to comment.