Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #101 from compose/multi-namespace
Browse files Browse the repository at this point in the history
(Phase 1) Multi namespace support
  • Loading branch information
jipperinbham committed Jul 27, 2015
2 parents b628daf + e8d2f4c commit 69c9c3b
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 187 deletions.
14 changes: 13 additions & 1 deletion pkg/adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package adaptor

import (
"encoding/json"
// "errors"
"fmt"
"reflect"
"regexp"
"strings"

"github.com/compose/transporter/pkg/pipe"
Expand Down Expand Up @@ -108,6 +108,18 @@ func (c *Config) splitNamespace() (string, string, error) {
return fields[0], fields[1], nil
}

// compileNamespace split's on the first '.' and then compiles the second portion to use as the msg filter
func (c *Config) compileNamespace() (string, *regexp.Regexp, error) {
field0, field1, err := c.splitNamespace()

if err != nil {
return "", nil, err
}

compiledNs, err := regexp.Compile(strings.Trim(field1, "/"))
return field0, compiledNs, err
}

// dbConfig is a standard typed config struct to use for as general purpose config for most databases.
type dbConfig struct {
URI string `json:"uri" doc:"the uri to connect to, in the form mongo://user:password@host.com:8080/database"` // the database uri
Expand Down
24 changes: 13 additions & 11 deletions pkg/adaptor/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package adaptor
import (
"fmt"
"net/url"
"regexp"
"strings"

"github.com/compose/transporter/pkg/message"
Expand All @@ -16,8 +17,8 @@ type Elasticsearch struct {
// pull these in from the node
uri *url.URL

_type string
index string
index string
typeMatch *regexp.Regexp

pipe *pipe.Pipe
path string
Expand Down Expand Up @@ -47,9 +48,9 @@ func NewElasticsearch(p *pipe.Pipe, path string, extra Config) (StopStartListene
pipe: p,
}

e.index, e._type, err = extra.splitNamespace()
e.index, e.typeMatch, err = extra.compileNamespace()
if err != nil {
return e, NewError(CRITICAL, path, fmt.Sprintf("can't split namespace into _index._type (%s)", err.Error()), nil)
return e, NewError(CRITICAL, path, fmt.Sprintf("can't split namespace into _index and typeMatch (%s)", err.Error()), nil)
}

return e, nil
Expand Down Expand Up @@ -80,7 +81,7 @@ func (e *Elasticsearch) Listen() error {
}
}()

return e.pipe.Listen(e.applyOp)
return e.pipe.Listen(e.applyOp, e.typeMatch)
}

// Stop the adaptor
Expand Down Expand Up @@ -109,12 +110,17 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) {
id = ""
}

_, _type, err := msg.SplitNamespace()
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("unable to determine type from msg.Namespace (%s)", msg.Namespace), msg)
return msg, nil
}
switch msg.Op {
case message.Delete:
e.indexer.Delete(e.index, e._type, id, false)
e.indexer.Delete(e.index, _type, id, false)
err = nil
default:
err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false)
err = e.indexer.Index(e.index, _type, id, "", nil, msg.Data, false)
}
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("elasticsearch error (%s)", err), msg.Data)
Expand Down Expand Up @@ -155,7 +161,3 @@ func (e *Elasticsearch) runCommand(msg *message.Msg) error {
}
return nil
}

func (e *Elasticsearch) getNamespace() string {
return strings.Join([]string{e.index, e._type}, ".")
}
5 changes: 3 additions & 2 deletions pkg/adaptor/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"regexp"
"strings"

"github.com/compose/transporter/pkg/message"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (d *File) Listen() (err error) {
}
}

return d.pipe.Listen(d.dumpMessage)
return d.pipe.Listen(d.dumpMessage, regexp.MustCompile(`.*`))
}

// Stop the adaptor
Expand All @@ -89,7 +90,7 @@ func (d *File) readFile() (err error) {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't marshal document (%s)", err.Error()), nil)
return err
}
d.pipe.Send(message.NewMsg(message.Insert, doc))
d.pipe.Send(message.NewMsg(message.Insert, doc, fmt.Sprint("file.%s", filename)))
}
return nil
}
Expand Down
Loading

0 comments on commit 69c9c3b

Please sign in to comment.