Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Simplifies the config.yaml and node configuration #36

Merged
merged 4 commits into from
Dec 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cmd/transporter/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *listCommand) Run(args []string) int {
var configFilename string
cmdFlags := flag.NewFlagSet("list", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Help() }
cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file")
cmdFlags.StringVar(&configFilename, "config", "", "config file")
cmdFlags.Parse(args)

config, err := LoadConfig(configFilename)
Expand All @@ -56,7 +56,9 @@ func (c *listCommand) Run(args []string) int {
}
fmt.Printf("%-20s %-15s %s\n", "Name", "Type", "URI")
for n, v := range config.Nodes {
fmt.Printf("%-20s %-15s %s\n", n, v.Type, v.URI)
kind, _ := v["type"].(string)
uri, _ := v["uri"].(string)
fmt.Printf("%-20s %-15s %s\n", n, kind, uri)
}

return 0
Expand Down Expand Up @@ -86,7 +88,7 @@ func (c *runCommand) Run(args []string) int {
var configFilename string
cmdFlags := flag.NewFlagSet("run", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Help() }
cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file")
cmdFlags.StringVar(&configFilename, "config", "", "config file")
cmdFlags.Parse(args)

config, err := LoadConfig(configFilename)
Expand Down Expand Up @@ -136,7 +138,7 @@ func (c *testCommand) Run(args []string) int {
var configFilename string
cmdFlags := flag.NewFlagSet("test", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Help() }
cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file")
cmdFlags.StringVar(&configFilename, "config", "", "config file")
cmdFlags.Parse(args)

config, err := LoadConfig(configFilename)
Expand Down Expand Up @@ -182,7 +184,7 @@ func (c *evalCommand) Run(args []string) int {
var configFilename string
cmdFlags := flag.NewFlagSet("run", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Help() }
cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file")
cmdFlags.StringVar(&configFilename, "config", "", "config file")
cmdFlags.Parse(args)

config, err := LoadConfig(configFilename)
Expand Down
10 changes: 5 additions & 5 deletions cmd/transporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ type Config struct {
Key string `json:"key" yaml:"key"` // http basic auth password to send with each event
Pid string `json:"pid" yaml:"pid"` // http basic auth username to send with each event
} `json:"api" yaml:"api"`
Nodes map[string]struct {
Type string `json:"type" yaml:"type"`
URI string `json:"uri" yaml:"uri"`
}
Nodes map[string]map[string]interface{}
}

// LoadConfig loads a config yaml from a file on disk.
// if the pid is not set in the yaml, pull it from the environment TRANSPORTER_PID.
// if that env var isn't present, then generate a pid
func LoadConfig(filename string) (config Config, err error) {
if filename == "" {
return
if _, err := os.Stat("config.yaml"); os.IsNotExist(err) {
return config, nil // return the default config
}
filename = "config.yaml"
}

ba, err := ioutil.ReadFile(filename)
Expand Down
119 changes: 66 additions & 53 deletions cmd/transporter/javascript_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"path/filepath"
"time"

"github.com/compose/transporter/pkg/adaptor"
"github.com/compose/transporter/pkg/events"
"github.com/compose/transporter/pkg/transporter"
"github.com/nu7hatch/gouuid"
Expand Down Expand Up @@ -67,7 +66,7 @@ func (js *JavascriptBuilder) source(call otto.FunctionCall) otto.Value {
return otto.NullValue()
}

node, err := js.findNode(call.Argument(0))
node, err := js.findNode("source", call.Argument(0))
if err != nil {
js.err = fmt.Errorf("source error, %s", err.Error())
return otto.NullValue()
Expand All @@ -87,8 +86,8 @@ func (js *JavascriptBuilder) source(call otto.FunctionCall) otto.Value {

// save adds a sink to the transporter pipeline
// each pipeline can have multiple sinks
func (js *JavascriptBuilder) save(node Node, call otto.FunctionCall) (Node, error) {
thisNode, err := js.findNode(call.Argument(0))
func (js *JavascriptBuilder) save(token string, node Node, call otto.FunctionCall) (Node, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are the function arguments different? Not seeing where the additional token arg is coming from...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, found it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

token is intruduced in the setFunc, so that errors that come from save or from transform can be tagged appropriately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

thisNode, err := js.findNode(token, call.Argument(0))
if err != nil {
return node, fmt.Errorf("save error, %s", err.Error())
}
Expand All @@ -107,49 +106,28 @@ func (js *JavascriptBuilder) save(node Node, call otto.FunctionCall) (Node, erro

// adds a transform function to the transporter pipeline
// transform takes one argument, which is a path to a transformer file.
func (js *JavascriptBuilder) transform(node Node, call otto.FunctionCall) (Node, error) {
e, err := call.Argument(0).Export()
func (js *JavascriptBuilder) transform(token string, node Node, call otto.FunctionCall) (Node, error) {
transformer, err := js.findNode(token, call.Argument(0))
if err != nil {
return node, err
}

rawMap, ok := e.(map[string]interface{})
if !ok {
return node, fmt.Errorf("transform error. first argument must be an hash. (got %T instead)", e)
return node, fmt.Errorf("save error, %s", err.Error())
}

filename, ok := rawMap["filename"].(string)
if !ok {
filename := transformer.Extra.GetString("filename")
if filename == "" {
return node, fmt.Errorf("transformer config must contain a valid filename key")
}
if !filepath.IsAbs(filename) {
filename = filepath.Join(js.path, filename)
}

debug, ok := rawMap["debug"].(bool)

name, ok := rawMap["name"].(string)
if !(ok) {
u, err := uuid.NewV4()
if err != nil {
return node, fmt.Errorf("transform error. uuid error (%s)", err.Error())
}
name = u.String()
}

transformer, err := NewNode(name, "transformer", adaptor.Config{"filename": filename, "debug": debug})
if err != nil {
return node, fmt.Errorf("transform error. cannot create node (%s)", err.Error())
if !filepath.IsAbs(filename) {
transformer.Extra["filename"] = filepath.Join(js.path, filename)
}

node.Add(&transformer)

return transformer, nil
}

// pipelines in javascript are chainable, you take in a pipeline, and you return a pipeline
// we just generalize some of that logic here
func (js *JavascriptBuilder) setFunc(obj *otto.Object, token string, fn func(Node, otto.FunctionCall) (Node, error)) error {
func (js *JavascriptBuilder) setFunc(obj *otto.Object, token string, fn func(string, Node, otto.FunctionCall) (Node, error)) error {
return obj.Set(token, func(call otto.FunctionCall) otto.Value {
this, _ := call.This.Export()

Expand All @@ -159,7 +137,7 @@ func (js *JavascriptBuilder) setFunc(obj *otto.Object, token string, fn func(Nod
return otto.NullValue()
}

node, err = fn(node, call)
node, err = fn(token, node, call)
if err != nil {
js.err = err
return otto.NullValue()
Expand All @@ -180,34 +158,69 @@ func (js *JavascriptBuilder) setFunc(obj *otto.Object, token string, fn func(Nod

// find the node from the based ont the hash passed in
// the hash needs to at least have a {name: }property
func (js *JavascriptBuilder) findNode(in otto.Value) (n Node, err error) {
func (js *JavascriptBuilder) findNode(token string, in otto.Value) (n Node, err error) {
var (
configOptions map[string]interface{}
givenOptions map[string]interface{}
ok bool
name string
kind string
)

e, err := in.Export()
if err != nil {
return n, err
}

rawMap, ok := e.(map[string]interface{})
if !ok {
return n, fmt.Errorf("first argument must be an hash. (got %T instead)", in)
}

// make sure the hash validates.
// we need a "name" property, and it must be a string
if _, ok := rawMap["name"]; !ok {
return n, fmt.Errorf("hash requires a name")
}
sourceString, ok := rawMap["name"].(string)
if !(ok) {
return n, fmt.Errorf("hash requires a name")
// accept both a json hash and a string as an argument.
// if the arg is a hash, then we should extract the name,
// and pull the node from the yaml, and then merge the given options
// over top of the options presented in the config node.
//
// if the arg is a string, then use that string as the name
// and pull the config node
switch arg := e.(type) {
case map[string]interface{}:
givenOptions = arg
if name, ok = givenOptions["name"].(string); ok {
configOptions, ok = js.config.Nodes[name]
if !ok { // we can't pull in any config options here
configOptions = make(map[string]interface{})
}

for k, v := range givenOptions {
configOptions[k] = v
}
givenOptions = configOptions

} else { // we don't have a name, so lets generate one.
u, err := uuid.NewV4()
if err != nil {
return n, fmt.Errorf("%s error. unable to create uuid (%s)", token, err.Error())
}
name = u.String()
givenOptions["name"] = name
}
case string:
name = arg
givenOptions, ok = js.config.Nodes[name]
if !ok {
return n, fmt.Errorf("%s error. unable to find node '%s'", token, name)
}
}

val, ok := js.config.Nodes[sourceString]
if !ok {
return n, fmt.Errorf("no configured nodes found named %s", sourceString)
if token == "transform" {
// this is a little bit of magic so that
// transformers (which are added by the )
kind = "transformer"
} else {
kind, ok = givenOptions["type"].(string)
if !ok {
return n, fmt.Errorf("%s: hash requires a type field, but no type given", token)
}
}
rawMap["uri"] = val.URI

return NewNode(sourceString, val.Type, rawMap)
return NewNode(name, kind, givenOptions)
}

// emitter examines the config file for api information
Expand Down
2 changes: 1 addition & 1 deletion test/application.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@

// create a pipeline that reads documents from a file, transforms them, and writes them
pipeline = Source({name:"foofile"}).transform({name: "simpletrans", filename: "transformers/passthrough_and_log.js", debug: false}).save({name:"errorfile"})
Source({name:"localmongo", namespace:"boom.foo"}).transform({name: "simpletrans", filename: "transformers/passthrough_and_log.js", debug: false}).save({name:"loosefile", uri:"file:///tmp/foo"})
10 changes: 10 additions & 0 deletions test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ nodes:
localmongo:
type: mongo
uri: mongodb://localhost/boom
namespace: boom.foo
debug: true
es:
type: elasticsearch
uri: https://nick:darling@haproxy1.dblayer.com:10291/thisgetsignored
Expand All @@ -28,3 +30,11 @@ nodes:
stdout:
type: file
uri: stdout://
rethink1:
type: rethinkdb
uri: rethink://127.0.0.2:28015/
loosefile:
type: file
logtransformer:
filename: test/transformers/passthrough_and_log.js
type: transformer