Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

(Phase 1) Multi namespace support #101

Merged
merged 4 commits into from
Jul 27, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package adaptor

import (
"encoding/json"
// "errors"
"fmt"
"reflect"
"regexp"
"strings"

"github.com/compose/transporter/pkg/pipe"
Expand Down Expand Up @@ -108,6 +108,18 @@ func (c *Config) splitNamespace() (string, string, error) {
return fields[0], fields[1], nil
}

// compileNamespace split's on the first '.' and then compiles the second portion to use as the msg filter
func (c *Config) compileNamespace() (string, *regexp.Regexp, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make things cleaner to have a
type Namespace regexp.Regexp
and then have methods func NewNamespace(s string), and func (ns *Namespace) Match(s) bool

field0, field1, err := c.splitNamespace()

if err != nil {
return "", nil, err
}

compiledNs, err := regexp.Compile(strings.Trim(field1, "/"))
return field0, compiledNs, err
}

// dbConfig is a standard typed config struct to use for as general purpose config for most databases.
type dbConfig struct {
URI string `json:"uri" doc:"the uri to connect to, in the form mongo://user:password@host.com:8080/database"` // the database uri
Expand Down
24 changes: 13 additions & 11 deletions pkg/adaptor/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package adaptor
import (
"fmt"
"net/url"
"regexp"
"strings"

"github.com/compose/transporter/pkg/message"
Expand All @@ -16,8 +17,8 @@ type Elasticsearch struct {
// pull these in from the node
uri *url.URL

_type string
index string
index string
typeMatch *regexp.Regexp

pipe *pipe.Pipe
path string
Expand Down Expand Up @@ -47,9 +48,9 @@ func NewElasticsearch(p *pipe.Pipe, path string, extra Config) (StopStartListene
pipe: p,
}

e.index, e._type, err = extra.splitNamespace()
e.index, e.typeMatch, err = extra.compileNamespace()
if err != nil {
return e, NewError(CRITICAL, path, fmt.Sprintf("can't split namespace into _index._type (%s)", err.Error()), nil)
return e, NewError(CRITICAL, path, fmt.Sprintf("can't split namespace into _index and typeMatch (%s)", err.Error()), nil)
}

return e, nil
Expand Down Expand Up @@ -80,7 +81,7 @@ func (e *Elasticsearch) Listen() error {
}
}()

return e.pipe.Listen(e.applyOp)
return e.pipe.Listen(e.typeMatch, e.applyOp)
}

// Stop the adaptor
Expand Down Expand Up @@ -109,12 +110,17 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) {
id = ""
}

_, _type, err := msg.SplitNamespace()
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("unable to determine type from msg.Namespace (%s)", msg.Namespace), msg)
return msg, nil
}
switch msg.Op {
case message.Delete:
e.indexer.Delete(e.index, e._type, id, false)
e.indexer.Delete(e.index, _type, id, false)
err = nil
default:
err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false)
err = e.indexer.Index(e.index, _type, id, "", nil, msg.Data, false)
}
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("elasticsearch error (%s)", err), msg.Data)
Expand Down Expand Up @@ -155,7 +161,3 @@ func (e *Elasticsearch) runCommand(msg *message.Msg) error {
}
return nil
}

func (e *Elasticsearch) getNamespace() string {
return strings.Join([]string{e.index, e._type}, ".")
}
5 changes: 3 additions & 2 deletions pkg/adaptor/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"regexp"
"strings"

"github.com/compose/transporter/pkg/message"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (d *File) Listen() (err error) {
}
}

return d.pipe.Listen(d.dumpMessage)
return d.pipe.Listen(regexp.MustCompile(`.*`), d.dumpMessage)
}

// Stop the adaptor
Expand All @@ -89,7 +90,7 @@ func (d *File) readFile() (err error) {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't marshal document (%s)", err.Error()), nil)
return err
}
d.pipe.Send(message.NewMsg(message.Insert, doc))
d.pipe.Send(message.NewMsg(message.Insert, doc, fmt.Sprint("file.%s", filename)))
}
return nil
}
Expand Down
Loading