diff --git a/examples/pipeline/conditional_001.cue b/examples/pipeline/conditional_001.cue index 14efb9c..85725f6 100644 --- a/examples/pipeline/conditional_001.cue +++ b/examples/pipeline/conditional_001.cue @@ -1,5 +1,9 @@ import "encoding/json" +vars: { + which: string @tag(which) +} + apicall: { @pipeline(apicall) In: string diff --git a/ideas.md b/ideas.md index 960aede..4e676d7 100644 --- a/ideas.md +++ b/ideas.md @@ -1,67 +1,38 @@ -# TODO +# Todo -Docs... +- [ ] merge and release + +Enhance & expand + +- [ ] list tags / secrets +- [ ] obfescate secrets, centralized printing (ensure that is the case in every task / run) +- [ ] locks for tasks +- [ ] more task types + - [ ] file append + - [ ] mkdir + - [ ] chan / mailbox + - [ ] memory store / load + - [ ] prevent exit when error in handler pipelines -rename pipeline -> run,dag + +Rename: +- rename pipeline -> run +- hof/flow cmd + +Docs... --- Build other things cuetils/run +- save all IRC messages to DB - bookmarks and hn upvotes +- change my lights +- replace helm (need native topo sort) +- OAuth workflow --- -- auth / oauth - - [x] twitch (oauth code) -- [ ] api calls - - [x] update stream title - - [x] twitch (other) - - [ ] update go live message - - [ ] which of my streamers are live -- [x] chat - - [x] twitch-irc - -- [ ] memory store / load -- [ ] prevent exit when error in handler pipelines -- [ ] api.GraphQL but probably better as a wrapper around api.Call ---- - -Get server working: -- [x] wired up routes -- [x] route pipelines - -CLI work: -- [x] list pipelines that can be run -- [x] enable docs for pipelines to be read/written -- [x] know what inputs / outputs of a pipeline are -- revisit tags and get them working - - also print - - find and return path, unify at top? - ---- - -Additional examples: -- auth / oauth - - [ ] twitter - - [x] youtube (apikey) -- [ ] api calls - - [ ] twitter - - [ ] youtube -- [ ] chat - - [ ] slack - - [ ] discord - -Additional Tasks: -- [ ] os.Getwd - -Centralized Printing: -- chan for tasks to write strings to - -Secrets: -- `secret: [string]: string` as secrets to be elided from output -- add as a filter to centralized printing - Bookkeeping: - debug flag to print tasks which run - stats for tasks & pipelines, chan to central @@ -75,8 +46,6 @@ Exec & Serve Then... -- OAuth workflow -- Twitchbot More... @@ -94,13 +63,14 @@ Example Pipeline: Helpers: -- extend (add if not present) - canonicalize (sort fields recursively) +- toposort List processing: - jsonl - yaml with `---` +- CUE got streaming json/yaml support - if extracted value is a list? Go funcs: diff --git a/pipeline/pipelines.go b/pipeline/pipelines.go new file mode 100644 index 0000000..f8dba9a --- /dev/null +++ b/pipeline/pipelines.go @@ -0,0 +1,171 @@ +package pipeline + +import ( + "fmt" + + "cuelang.org/go/cue" + + "github.com/hofstadter-io/cuetils/cmd/cuetils/flags" + "github.com/hofstadter-io/cuetils/pipeline/context" + "github.com/hofstadter-io/cuetils/pipeline/pipe" + "github.com/hofstadter-io/cuetils/structural" + "github.com/hofstadter-io/cuetils/utils" +) + +func hasPipelineAttr(val cue.Value, args []string) (attr cue.Attribute, found, keep bool) { + attrs := val.Attributes(cue.ValueAttr) + + for _, attr := range attrs { + if attr.Name() == "pipeline" { + // found a pipeline, stop recursion + found = true + // if it matches our args, create and append + keep = matchPipeline(attr, args) + if keep { + return attr, true, true + } + } + } + + return cue.Attribute{}, found, false +} + +func matchPipeline(attr cue.Attribute, args []string) (keep bool) { + // fmt.Println("matching 1:", attr, args, len(args), attr.NumArgs()) + // if no args, match pipelines without args + if len(args) == 0 { + if attr.NumArgs() == 0 { + return true + } + // extra check for one arg which is empty + if attr.NumArgs() == 1 { + s, err := attr.String(0) + if err != nil { + fmt.Println("bad pipeline tag:", err) + return false + } + return s == "" + } + + return false + } + + // for now, match any + // upgrade logic for user later + for _, tag := range args { + for p := 0; p < attr.NumArgs(); p++ { + s, err := attr.String(p) + if err != nil { + fmt.Println("bad pipeline tag:", err) + return false + } + if s == tag { + return true + } + } + } + + return false +} + +func listPipelines(val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) (error) { + args := popts.Pipeline + + printer := func(v cue.Value) bool { + attrs := v.Attributes(cue.ValueAttr) + + for _, attr := range attrs { + if attr.Name() == "pipeline" { + if len(args) == 0 || matchPipeline(attr, args) { + if popts.Docs { + s := "" + docs := v.Doc() + for _, d := range docs { + s += d.Text() + } + fmt.Print(s) + } + if opts.Verbose { + s, _ := utils.FormatCue(v) + fmt.Printf("%s: %s\n", v.Path(), s) + } else { + fmt.Println(attr) + } + } + return false + } + } + + return true + } + + structural.Walk(val, printer, nil, walkOptions...) + + return nil +} + +// maybe this becomes recursive so we can find +// nested pipelines, but not recurse when we find one +// only recurse when we have something which is not a pipeline or task +func findPipelines(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) ([]*pipe.Pipeline, error) { + pipes := []*pipe.Pipeline{} + + // TODO, look for lists? + s, err := val.Struct() + if err != nil { + // not a struct, so don't recurse + // fmt.Println("not a struct", err) + return nil, nil + } + + args := popts.Pipeline + + // does our top-level (file-level) have @pipeline() + _, found, keep := hasPipelineAttr(val, args) + if keep { + // invoke TaskFactory + p, err := pipe.NewPipeline(ctx, val) + if err != nil { + return pipes, err + } + pipes = append(pipes, p) + } + + if found { + return pipes, nil + } + + iter := s.Fields( + cue.Attributes(true), + cue.Docs(true), + ) + + // loop over top-level fields in the cue instance + for iter.Next() { + v := iter.Value() + + _, found, keep := hasPipelineAttr(v, args) + if keep { + p, err := pipe.NewPipeline(ctx, v) + if err != nil { + return pipes, err + } + pipes = append(pipes, p) + } + + // break recursion if pipeline found + if found { + continue + } + + // recurse to search for more pipelines + ps, err := findPipelines(ctx, v, opts, popts) + if err != nil { + return pipes, nil + } + pipes = append(pipes, ps...) + } + + return pipes, nil +} + diff --git a/pipeline/run.go b/pipeline/run.go index 6a40a93..843e217 100644 --- a/pipeline/run.go +++ b/pipeline/run.go @@ -4,7 +4,6 @@ import ( go_ctx "context" "fmt" "os" - "strings" // "time" @@ -13,10 +12,10 @@ import ( "github.com/hofstadter-io/cuetils/cmd/cuetils/flags" "github.com/hofstadter-io/cuetils/pipeline/context" - _ "github.com/hofstadter-io/cuetils/pipeline/tasks" // ensure tasks register "github.com/hofstadter-io/cuetils/pipeline/pipe" + _ "github.com/hofstadter-io/cuetils/pipeline/tasks" // ensure tasks register "github.com/hofstadter-io/cuetils/structural" - "github.com/hofstadter-io/cuetils/utils" + // "github.com/hofstadter-io/cuetils/utils" ) /* @@ -65,8 +64,23 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.PipelineFlagpol return nil, err } - var ps []*pipe.Pipeline + // lets just print if popts.List { + tags, errs := getTags(val) + if len(errs) > 0 { + return nil, fmt.Errorf("in getTags: %v", errs) + } + if len(tags) > 0 { + fmt.Println("tags:\n==============") + for _, v := range tags { + path := v.Path() + fmt.Printf("%s: %# v %v\n", path, v, v.Attribute("tag")) + } + fmt.Println() + } + + + fmt.Println("flows:\n==============") err = listPipelines(val, opts, popts) if err != nil { return nil, err @@ -75,7 +89,7 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.PipelineFlagpol continue } - ps, err = findPipelines(taskCtx, val, opts, popts) + ps, err := findPipelines(taskCtx, val, opts, popts) if err != nil { return nil, err } @@ -104,172 +118,6 @@ func run(globs []string, opts *flags.RootPflagpole, popts *flags.PipelineFlagpol return nil, nil } -// maybe this becomes recursive so we can find -// nested pipelines, but not recurse when we find one -// only recurse when we have something which is not a pipeline or task -func findPipelines(ctx *context.Context, val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) ([]*pipe.Pipeline, error) { - pipes := []*pipe.Pipeline{} - - // TODO, look for lists? - s, err := val.Struct() - if err != nil { - // not a struct, so don't recurse - // fmt.Println("not a struct", err) - return nil, nil - } - - args := popts.Pipeline - - // does our top-level (file-level) have @pipeline() - _, found, keep := hasPipelineAttr(val, args) - if keep { - // invoke TaskFactory - p, err := pipe.NewPipeline(ctx, val) - if err != nil { - return pipes, err - } - pipes = append(pipes, p) - } - - if found { - return pipes, nil - } - - iter := s.Fields( - cue.Attributes(true), - cue.Docs(true), - ) - - // loop over top-level fields in the cue instance - for iter.Next() { - v := iter.Value() - - _, found, keep := hasPipelineAttr(v, args) - if keep { - p, err := pipe.NewPipeline(ctx, v) - if err != nil { - return pipes, err - } - pipes = append(pipes, p) - } - - // break recursion if pipeline found - if found { - continue - } - - // recurse to search for more pipelines - ps, err := findPipelines(ctx, v, opts, popts) - if err != nil { - return pipes, nil - } - pipes = append(pipes, ps...) - } - - return pipes, nil -} - -func hasPipelineAttr(val cue.Value, args []string) (attr cue.Attribute, found, keep bool) { - attrs := val.Attributes(cue.ValueAttr) - - for _, attr := range attrs { - if attr.Name() == "pipeline" { - // found a pipeline, stop recursion - found = true - // if it matches our args, create and append - keep = matchPipeline(attr, args) - if keep { - return attr, true, true - } - } - } - - return cue.Attribute{}, found, false -} - -func matchPipeline(attr cue.Attribute, args []string) (keep bool) { - // fmt.Println("matching 1:", attr, args, len(args), attr.NumArgs()) - // if no args, match pipelines without args - if len(args) == 0 { - if attr.NumArgs() == 0 { - return true - } - // extra check for one arg which is empty - if attr.NumArgs() == 1 { - s, err := attr.String(0) - if err != nil { - fmt.Println("bad pipeline tag:", err) - return false - } - return s == "" - } - - return false - } - - // for now, match any - // upgrade logic for user later - for _, tag := range args { - for p := 0; p < attr.NumArgs(); p++ { - s, err := attr.String(p) - if err != nil { - fmt.Println("bad pipeline tag:", err) - return false - } - if s == tag { - return true - } - } - } - - return false -} - -func injectTags(val cue.Value, tags []string) (cue.Value, error) { - tagMap := make(map[string]string) - for _, t := range tags { - fs := strings.SplitN(t, "=", 2) - if len(fs) != 2 { - return val, fmt.Errorf("tags must have form key=value, got %q", t) - } - tagMap[fs[0]] =fs[1] - } - - tagPaths := make(map[string]cue.Path) - errs := []error{} - collector := func (v cue.Value) bool { - attrs := v.Attributes(cue.ValueAttr) - - var err error - for _, attr := range attrs { - if attr.Name() == "tag" { - if attr.NumArgs() == 0 { - err = fmt.Errorf("@tag() has no inner args at %s", v.Path()) - errs = append(errs, err) - return false - } - // TODO, better options &| UX here - arg, _ := attr.String(0) - _, ok := tagMap[arg] - if ok { - tagPaths[arg] = v.Path() - } - - return false - } - } - - return true - } - - structural.Walk(val, collector, nil, walkOptions...) - - for arg, path := range tagPaths { - val = val.FillPath(path, tagMap[arg]) - } - - return val, nil -} var walkOptions = []cue.Option{ cue.Attributes(true), @@ -280,42 +128,6 @@ var walkOptions = []cue.Option{ cue.Docs(true), } -func listPipelines(val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) (error) { - args := popts.Pipeline - - printer := func(v cue.Value) bool { - attrs := v.Attributes(cue.ValueAttr) - - for _, attr := range attrs { - if attr.Name() == "pipeline" { - if len(args) == 0 || matchPipeline(attr, args) { - if popts.Docs { - s := "" - docs := v.Doc() - for _, d := range docs { - s += d.Text() - } - fmt.Print(s) - } - if opts.Verbose { - s, _ := utils.FormatCue(v) - fmt.Printf("%s: %s\n", v.Path(), s) - } else { - fmt.Println(attr) - } - } - return false - } - } - - return true - } - - structural.Walk(val, printer, nil, walkOptions...) - - return nil -} - func buildTaskContext(val cue.Value, opts *flags.RootPflagpole, popts *flags.PipelineFlagpole) (*context.Context, error) { // lookup the secret label in val // and build a filter write for stdout / stderr diff --git a/pipeline/tags.go b/pipeline/tags.go new file mode 100644 index 0000000..3883efb --- /dev/null +++ b/pipeline/tags.go @@ -0,0 +1,85 @@ +package pipeline + +import ( + "fmt" + "strings" + + "cuelang.org/go/cue" + + "github.com/hofstadter-io/cuetils/structural" +) + +func getTags(val cue.Value) (tags []cue.Value, errs []error) { + + // fuction used during tree walk to collect values with tags + collector := func (v cue.Value) bool { + attrs := v.Attributes(cue.ValueAttr) + + var err error + for _, attr := range attrs { + if attr.Name() == "tag" { + if attr.NumArgs() == 0 { + err = fmt.Errorf("@tag() has no inner args at %s", v.Path()) + errs = append(errs, err) + return false + } + tags = append(tags, v) + return false + } + } + + return true + } + + structural.Walk(val, collector, nil, walkOptions...) + + + return tags, errs +} + +func injectTags(val cue.Value, tags []string) (cue.Value, error) { + tagMap := make(map[string]string) + for _, t := range tags { + fs := strings.SplitN(t, "=", 2) + if len(fs) != 2 { + return val, fmt.Errorf("tags must have form key=value, got %q", t) + } + tagMap[fs[0]] =fs[1] + } + + tagPaths := make(map[string]cue.Path) + errs := []error{} + collector := func (v cue.Value) bool { + attrs := v.Attributes(cue.ValueAttr) + + var err error + for _, attr := range attrs { + if attr.Name() == "tag" { + if attr.NumArgs() == 0 { + err = fmt.Errorf("@tag() has no inner args at %s", v.Path()) + errs = append(errs, err) + return false + } + // TODO, better options &| UX here + arg, _ := attr.String(0) + _, ok := tagMap[arg] + if ok { + tagPaths[arg] = v.Path() + } + + return false + } + } + + return true + } + + structural.Walk(val, collector, nil, walkOptions...) + + for arg, path := range tagPaths { + val = val.FillPath(path, tagMap[arg]) + } + + return val, nil +} +