Skip to content

Commit

Permalink
print tags when listing available pipelines; split up files in pipeline/
Browse files Browse the repository at this point in the history
  • Loading branch information
verdverm committed Feb 2, 2022
1 parent 081e585 commit 29241a4
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 263 deletions.
4 changes: 4 additions & 0 deletions examples/pipeline/conditional_001.cue
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import "encoding/json"

vars: {
which: string @tag(which)
}

apicall: {
@pipeline(apicall)
In: string
Expand Down
82 changes: 26 additions & 56 deletions ideas.md
Original file line number Diff line number Diff line change
@@ -1,67 +1,38 @@
# TODO
# Todo

Docs...
- [ ] merge and release

Enhance & expand

- [ ] list tags / secrets
- [ ] obfescate secrets, centralized printing (ensure that is the case in every task / run)
- [ ] locks for tasks
- [ ] more task types
- [ ] file append
- [ ] mkdir
- [ ] chan / mailbox
- [ ] memory store / load
- [ ] prevent exit when error in handler pipelines

rename pipeline -> run,dag

Rename:
- rename pipeline -> run
- hof/flow cmd

Docs...

---

Build other things cuetils/run

- save all IRC messages to DB
- bookmarks and hn upvotes
- change my lights
- replace helm (need native topo sort)
- OAuth workflow

---

- auth / oauth
- [x] twitch (oauth code)
- [ ] api calls
- [x] update stream title
- [x] twitch (other)
- [ ] update go live message
- [ ] which of my streamers are live
- [x] chat
- [x] twitch-irc

- [ ] memory store / load
- [ ] prevent exit when error in handler pipelines
- [ ] api.GraphQL but probably better as a wrapper around api.Call
---

Get server working:
- [x] wired up routes
- [x] route pipelines

CLI work:
- [x] list pipelines that can be run
- [x] enable docs for pipelines to be read/written
- [x] know what inputs / outputs of a pipeline are
- revisit tags and get them working
- also print
- find and return path, unify at top?

---

Additional examples:
- auth / oauth
- [ ] twitter
- [x] youtube (apikey)
- [ ] api calls
- [ ] twitter
- [ ] youtube
- [ ] chat
- [ ] slack
- [ ] discord

Additional Tasks:
- [ ] os.Getwd

Centralized Printing:
- chan for tasks to write strings to

Secrets:
- `secret: [string]: string` as secrets to be elided from output
- add as a filter to centralized printing

Bookkeeping:
- debug flag to print tasks which run
- stats for tasks & pipelines, chan to central
Expand All @@ -75,8 +46,6 @@ Exec & Serve

Then...

- OAuth workflow
- Twitchbot

More...

Expand All @@ -94,13 +63,14 @@ Example Pipeline:

Helpers:

- extend (add if not present)
- canonicalize (sort fields recursively)
- toposort

List processing:

- jsonl
- yaml with `---`
- CUE got streaming json/yaml support
- if extracted value is a list?

Go funcs:
Expand Down
171 changes: 171 additions & 0 deletions pipeline/pipelines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package pipeline

import (
"fmt"

"cuelang.org/go/cue"

"github.com/hofstadter-io/cuetils/cmd/cuetils/flags"
"github.com/hofstadter-io/cuetils/pipeline/context"
"github.com/hofstadter-io/cuetils/pipeline/pipe"
"github.com/hofstadter-io/cuetils/structural"
"github.com/hofstadter-io/cuetils/utils"
)

func hasPipelineAttr(val cue.Value, args []string) (attr cue.Attribute, found, keep bool) {
attrs := val.Attributes(cue.ValueAttr)

for _, attr := range attrs {
if attr.Name() == "pipeline" {
// found a pipeline, stop recursion
found = true
// if it matches our args, create and append
keep = matchPipeline(attr, args)
if keep {
return attr, true, true
}
}
}

return cue.Attribute{}, found, false
}

func matchPipeline(attr cue.Attribute, args []string) (keep bool) {
// fmt.Println("matching 1:", attr, args, len(args), attr.NumArgs())
// if no args, match pipelines without args
if len(args) == 0 {
if attr.NumArgs() == 0 {
return true
}
// extra check for one arg which is empty
if attr.NumArgs() == 1 {
s, err := attr.String(0)
if err != nil {
fmt.Println("bad pipeline tag:", err)
return false
}
return s == ""
}

return false
}

// for now, match any
// upgrade logic for user later
for _, tag := range args {
for p := 0; p < attr.NumArgs(); p++ {
s, err := attr.String(p)
if err != nil {
fmt.Println("bad pipeline tag:", err)
return false
}
if s == tag {
return true
}
}
}

return false
}

func listPipelines(val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) (error) {
args := popts.Pipeline

printer := func(v cue.Value) bool {
attrs := v.Attributes(cue.ValueAttr)

for _, attr := range attrs {
if attr.Name() == "pipeline" {
if len(args) == 0 || matchPipeline(attr, args) {
if popts.Docs {
s := ""
docs := v.Doc()
for _, d := range docs {
s += d.Text()
}
fmt.Print(s)
}
if opts.Verbose {
s, _ := utils.FormatCue(v)
fmt.Printf("%s: %s\n", v.Path(), s)
} else {
fmt.Println(attr)
}
}
return false
}
}

return true
}

structural.Walk(val, printer, nil, walkOptions...)

return nil
}

// maybe this becomes recursive so we can find
// nested pipelines, but not recurse when we find one
// only recurse when we have something which is not a pipeline or task
func findPipelines(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) ([]*pipe.Pipeline, error) {
pipes := []*pipe.Pipeline{}

// TODO, look for lists?
s, err := val.Struct()
if err != nil {
// not a struct, so don't recurse
// fmt.Println("not a struct", err)
return nil, nil
}

args := popts.Pipeline

// does our top-level (file-level) have @pipeline()
_, found, keep := hasPipelineAttr(val, args)
if keep {
// invoke TaskFactory
p, err := pipe.NewPipeline(ctx, val)
if err != nil {
return pipes, err
}
pipes = append(pipes, p)
}

if found {
return pipes, nil
}

iter := s.Fields(
cue.Attributes(true),
cue.Docs(true),
)

// loop over top-level fields in the cue instance
for iter.Next() {
v := iter.Value()

_, found, keep := hasPipelineAttr(v, args)
if keep {
p, err := pipe.NewPipeline(ctx, v)
if err != nil {
return pipes, err
}
pipes = append(pipes, p)
}

// break recursion if pipeline found
if found {
continue
}

// recurse to search for more pipelines
ps, err := findPipelines(ctx, v, opts, popts)
if err != nil {
return pipes, nil
}
pipes = append(pipes, ps...)
}

return pipes, nil
}

Loading

0 comments on commit 29241a4

Please sign in to comment.