Skip to content

Commit

Permalink
finish s/pipeline/flow/g
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Worm <tony@hofstadter.io>
  • Loading branch information
verdverm committed Feb 2, 2022
1 parent 8952918 commit 4f0dc48
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 45 deletions.
4 changes: 2 additions & 2 deletions cmd/cuetils/cmd/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/spf13/cobra"

"github.com/hofstadter-io/cuetils/cmd/cuetils/flags"
"github.com/hofstadter-io/cuetils/pipeline"
"github.com/hofstadter-io/cuetils/flow"
"github.com/hofstadter-io/cuetils/structural"
)

Expand All @@ -23,7 +23,7 @@ func init() {

func FlowRun(globs []string) (err error) {

results, err := pipeline.Run(globs, &flags.RootPflags, &flags.FlowFlags)
results, err := flow.Run(globs, &flags.RootPflags, &flags.FlowFlags)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions flow/pipe/pipeline.go → flow/flow/flow.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pipe
package flow

import (
"fmt"
Expand Down Expand Up @@ -49,7 +49,7 @@ func (P *Flow) run(val cue.Value) error {

final, err := P.Ctrl.Run(P.Context.Context)

// fmt.Println("pipe(end):", P.path, P.rpath)
// fmt.Println("flow(end):", P.path, P.rpath)
P.Final = final
if err != nil {
s := structural.FormatCueError(err)
Expand Down
26 changes: 13 additions & 13 deletions flow/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/hofstadter-io/cuetils/cmd/cuetils/flags"
"github.com/hofstadter-io/cuetils/flow/context"
"github.com/hofstadter-io/cuetils/flow/pipe"
"github.com/hofstadter-io/cuetils/flow/flow"
"github.com/hofstadter-io/cuetils/structural"
"github.com/hofstadter-io/cuetils/utils"
)
Expand Down Expand Up @@ -107,8 +107,8 @@ func listFlows(val cue.Value, opts *flags.RootPflagpole, popts *flags.FlowFlagp
// maybe this becomes recursive so we can find
// nested flows, but not recurse when we find one
// only recurse when we have something which is not a flow or task
func findFlows(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, popts *flags.FlowFlagpole) ([]*pipe.Flow, error) {
pipes := []*pipe.Flow{}
func findFlows(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, popts *flags.FlowFlagpole) ([]*flow.Flow, error) {
flows := []*flow.Flow{}

// TODO, look for lists?
s, err := val.Struct()
Expand All @@ -124,15 +124,15 @@ func findFlows(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, p
_, found, keep := hasFlowAttr(val, args)
if keep {
// invoke TaskFactory
p, err := pipe.NewFlow(ctx, val)
p, err := flow.NewFlow(ctx, val)
if err != nil {
return pipes, err
return flows, err
}
pipes = append(pipes, p)
flows = append(flows, p)
}

if found {
return pipes, nil
return flows, nil
}

iter := s.Fields(
Expand All @@ -146,11 +146,11 @@ func findFlows(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, p

_, found, keep := hasFlowAttr(v, args)
if keep {
p, err := pipe.NewFlow(ctx, v)
p, err := flow.NewFlow(ctx, v)
if err != nil {
return pipes, err
return flows, err
}
pipes = append(pipes, p)
flows = append(flows, p)
}

// break recursion if flow found
Expand All @@ -161,11 +161,11 @@ func findFlows(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, p
// recurse to search for more flows
ps, err := findFlows(ctx, v, opts, popts)
if err != nil {
return pipes, nil
return flows, nil
}
pipes = append(pipes, ps...)
flows = append(flows, ps...)
}

return pipes, nil
return flows, nil
}

16 changes: 8 additions & 8 deletions flow/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/hofstadter-io/cuetils/cmd/cuetils/flags"
"github.com/hofstadter-io/cuetils/flow/context"
"github.com/hofstadter-io/cuetils/flow/pipe"
"github.com/hofstadter-io/cuetils/flow/flow"
_ "github.com/hofstadter-io/cuetils/flow/tasks" // ensure tasks register
"github.com/hofstadter-io/cuetils/structural"
// "github.com/hofstadter-io/cuetils/utils"
Expand Down Expand Up @@ -44,11 +44,11 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.FlowFlagpole) (

// sharedCtx := buildSharedContext

// (refactor/pipe/many) find flows
pipes := []*pipe.Flow{}
// (refactor/flow/many) find flows
flows := []*flow.Flow{}
for _, in := range ins {

// (refactor/pipe/solo)
// (refactor/flow/solo)
val := in.Value


Expand Down Expand Up @@ -101,21 +101,21 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.FlowFlagpole) (
if err != nil {
return nil, err
}
pipes = append(pipes, ps...)
flows = append(flows, ps...)
}

if popts.List {
return nil, nil
}

if len(pipes) == 0 {
if len(flows) == 0 {
return nil, fmt.Errorf("no flows found")
}

// start all of the flows
// TODO, use wait group, accume errors, flag for failure modes
for _, pipe := range pipes {
err := pipe.Start()
for _, flow := range flows {
err := flow.Start()
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions flow/tasks/api/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/labstack/echo-contrib/prometheus"

"github.com/hofstadter-io/cuetils/flow/context"
"github.com/hofstadter-io/cuetils/flow/pipe"
"github.com/hofstadter-io/cuetils/flow/flow"
)

func init() {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (T *Serve) routeFromValue(path string, route cue.Value, e *echo.Echo, ctx *
}

if isPipe {
p, err := pipe.NewFlow(ctx, tmp)
p, err := flow.NewFlow(ctx, tmp)
if err != nil {
return err
}
Expand Down
22 changes: 11 additions & 11 deletions flow/tasks/msg/irc-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"gopkg.in/irc.v3"

"github.com/hofstadter-io/cuetils/flow/context"
"github.com/hofstadter-io/cuetils/flow/pipe"
"github.com/hofstadter-io/cuetils/flow/flow"
"github.com/hofstadter-io/cuetils/utils"
)

Expand Down Expand Up @@ -213,11 +213,11 @@ func buildIrcHandler(ct_ctx *context.Context, val cue.Value) (irc.HandlerFunc, e
// is this a flow
errV := v.LookupPath(cue.ParsePath("error"))
respV := v.LookupPath(cue.ParsePath("resp"))
pipeV := v.LookupPath(cue.ParsePath("pipe"))
flowV := v.LookupPath(cue.ParsePath("flow"))

fmt.Println("errV:", errV)
fmt.Println("respV:", respV)
fmt.Println("pipeV:", pipeV)
fmt.Println("flowV:", flowV)

// log any errors
if errV.Exists() {
Expand All @@ -239,37 +239,37 @@ func buildIrcHandler(ct_ctx *context.Context, val cue.Value) (irc.HandlerFunc, e
}

// handle flows
if pipeV.Exists() {
if flowV.Exists() {
// build new value
v := ctx.CompileString("{...}")
v = v.Unify(pipeV)
v = v.Unify(flowV)

p, err := pipe.NewFlow(ct_ctx, v)
p, err := flow.NewFlow(ct_ctx, v)
if err != nil {
fmt.Println("Error(pipe/new):", err)
fmt.Println("Error(flow/new):", err)
return
}

err = p.Start()
if err != nil {
fmt.Println("Error(pipe/run):", err)
fmt.Println("Error(flow/run):", err)
return
}

rV := p.Final.LookupPath(cue.ParsePath("resp"))
if !rV.Exists() {
fmt.Println("Error(pipe/resp): does not exist")
fmt.Println("Error(flow/resp): does not exist")
return
}
s, err := rV.String()
if err != nil {
fmt.Println("Error(pipe/rVstr):", err)
fmt.Println("Error(flow/rVstr):", err)
return
}

// fill in go-irc.Message and then turn that into a string

fmt.Println("sending(pipe/msg):", s)
fmt.Println("sending(flow/msg):", s)
c.Writef("PRIVMSG %s :%s", channel, s)

return
Expand Down
4 changes: 2 additions & 2 deletions flow/tests/txtar/readfile_001.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
exec cuetils flow pipe.cue
exec cuetils flow flow.cue
# cmp stdout golden.stdout

-- pipe.cue --
-- flow.cue --
tasks: {
@flow(readfile)
r: { f: "in.txt", contents: string } @task(os.ReadFile)
Expand Down
4 changes: 2 additions & 2 deletions flow/tests/txtar/readfile_002.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
exec cuetils flow pipe.cue
exec cuetils flow flow.cue
# cmp stdout golden.stdout

-- pipe.cue --
-- flow.cue --
tasks: {
@flow(readfile)
r: { f: "in.json", contents: string } @task(os.ReadFile)
Expand Down
4 changes: 2 additions & 2 deletions flow/tests/txtar/readfile_003.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
! exec cuetils flow pipe.cue
! exec cuetils flow flow.cue
cmp stdout golden.stdout

-- pipe.cue --
-- flow.cue --
tasks: {
@flow(readfile)
r: { f: "in.txt", contents: string } @task(os.ReadFile)
Expand Down
5 changes: 4 additions & 1 deletion ideas.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ Enhance & expand
- [x] file append
- [x] mkdir
- [x] memory store / load
- [ ] rename pipeline -> run
- [x] rename pipeline -> run
- [ ] merge and release

---

- [ ] hof/flow cmd

### Docs...
Expand Down

0 comments on commit 4f0dc48

Please sign in to comment.