diff --git a/pkg/adaptor/adaptor.go b/pkg/adaptor/adaptor.go index 3d6628b66..d0ac83249 100644 --- a/pkg/adaptor/adaptor.go +++ b/pkg/adaptor/adaptor.go @@ -2,9 +2,9 @@ package adaptor import ( "encoding/json" - // "errors" "fmt" "reflect" + "regexp" "strings" "github.com/compose/transporter/pkg/pipe" @@ -108,6 +108,18 @@ func (c *Config) splitNamespace() (string, string, error) { return fields[0], fields[1], nil } +// compileNamespace split's on the first '.' and then compiles the second portion to use as the msg filter +func (c *Config) compileNamespace() (string, *regexp.Regexp, error) { + field0, field1, err := c.splitNamespace() + + if err != nil { + return "", nil, err + } + + compiledNs, err := regexp.Compile(strings.Trim(field1, "/")) + return field0, compiledNs, err +} + // dbConfig is a standard typed config struct to use for as general purpose config for most databases. type dbConfig struct { URI string `json:"uri" doc:"the uri to connect to, in the form mongo://user:password@host.com:8080/database"` // the database uri diff --git a/pkg/adaptor/elasticsearch.go b/pkg/adaptor/elasticsearch.go index 3ca7eab0d..914bd9080 100644 --- a/pkg/adaptor/elasticsearch.go +++ b/pkg/adaptor/elasticsearch.go @@ -3,6 +3,7 @@ package adaptor import ( "fmt" "net/url" + "regexp" "strings" "github.com/compose/transporter/pkg/message" @@ -16,8 +17,8 @@ type Elasticsearch struct { // pull these in from the node uri *url.URL - _type string - index string + index string + typeMatch *regexp.Regexp pipe *pipe.Pipe path string @@ -47,9 +48,9 @@ func NewElasticsearch(p *pipe.Pipe, path string, extra Config) (StopStartListene pipe: p, } - e.index, e._type, err = extra.splitNamespace() + e.index, e.typeMatch, err = extra.compileNamespace() if err != nil { - return e, NewError(CRITICAL, path, fmt.Sprintf("can't split namespace into _index._type (%s)", err.Error()), nil) + return e, NewError(CRITICAL, path, fmt.Sprintf("can't split namespace into _index and typeMatch (%s)", err.Error()), nil) } return e, nil @@ -80,7 +81,7 @@ func (e *Elasticsearch) Listen() error { } }() - return e.pipe.Listen(e.applyOp) + return e.pipe.Listen(e.applyOp, e.typeMatch) } // Stop the adaptor @@ -109,12 +110,17 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) { id = "" } + _, _type, err := msg.SplitNamespace() + if err != nil { + e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("unable to determine type from msg.Namespace (%s)", msg.Namespace), msg) + return msg, nil + } switch msg.Op { case message.Delete: - e.indexer.Delete(e.index, e._type, id, false) + e.indexer.Delete(e.index, _type, id, false) err = nil default: - err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false) + err = e.indexer.Index(e.index, _type, id, "", nil, msg.Data, false) } if err != nil { e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("elasticsearch error (%s)", err), msg.Data) @@ -155,7 +161,3 @@ func (e *Elasticsearch) runCommand(msg *message.Msg) error { } return nil } - -func (e *Elasticsearch) getNamespace() string { - return strings.Join([]string{e.index, e._type}, ".") -} diff --git a/pkg/adaptor/file.go b/pkg/adaptor/file.go index 4f5fd60e3..73eabf5f7 100644 --- a/pkg/adaptor/file.go +++ b/pkg/adaptor/file.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "regexp" "strings" "github.com/compose/transporter/pkg/message" @@ -62,7 +63,7 @@ func (d *File) Listen() (err error) { } } - return d.pipe.Listen(d.dumpMessage) + return d.pipe.Listen(d.dumpMessage, regexp.MustCompile(`.*`)) } // Stop the adaptor @@ -89,7 +90,7 @@ func (d *File) readFile() (err error) { d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't marshal document (%s)", err.Error()), nil) return err } - d.pipe.Send(message.NewMsg(message.Insert, doc)) + d.pipe.Send(message.NewMsg(message.Insert, doc, fmt.Sprint("file.%s", filename))) } return nil } diff --git a/pkg/adaptor/mongodb.go b/pkg/adaptor/mongodb.go index 0555e2b8c..05baf1527 100644 --- a/pkg/adaptor/mongodb.go +++ b/pkg/adaptor/mongodb.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "fmt" "net" + "regexp" "strings" "sync" "time" @@ -29,8 +30,8 @@ type Mongodb struct { debug bool // save time by setting these once - collection string - database string + collectionMatch *regexp.Regexp + database string oplogTime bson.MongoTimestamp @@ -44,15 +45,21 @@ type Mongodb struct { // a buffer to hold documents buffLock sync.Mutex - opsBuffer []interface{} + opsBufferCount int + opsBuffer map[string][]interface{} opsBufferSize int - bulkWriteChannel chan interface{} + bulkWriteChannel chan *SyncDoc bulkQuitChannel chan chan bool bulk bool restartable bool // this refers to being able to refresh the iterator, not to the restart based on session op } +type SyncDoc struct { + Doc map[string]interface{} + Collection string +} + // NewMongodb creates a new Mongodb adaptor func NewMongodb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { var ( @@ -79,13 +86,14 @@ func NewMongodb(p *pipe.Pipe, path string, extra Config) (StopStartListener, err tail: conf.Tail, debug: conf.Debug, path: path, - opsBuffer: make([]interface{}, 0, MONGO_BUFFER_LEN), - bulkWriteChannel: make(chan interface{}), + opsBuffer: make(map[string][]interface{}), + bulkWriteChannel: make(chan *SyncDoc), bulkQuitChannel: make(chan chan bool), bulk: conf.Bulk, } + // opsBuffer: make([]*SyncDoc, 0, MONGO_BUFFER_LEN), - m.database, m.collection, err = m.splitNamespace(conf.Namespace) + m.database, m.collectionMatch, err = extra.compileNamespace() if err != nil { return m, err } @@ -175,7 +183,7 @@ func (m *Mongodb) Listen() (err error) { if m.bulk { go m.bulkWriter() } - return m.pipe.Listen(m.writeMessage) + return m.pipe.Listen(m.writeMessage, m.collectionMatch) } // Stop the adaptor @@ -196,25 +204,35 @@ func (m *Mongodb) Stop() error { // TODO this can be cleaned up. I'm not sure whether this should pipe the error, or whether the // caller should pipe the error func (m *Mongodb) writeMessage(msg *message.Msg) (*message.Msg, error) { - collection := m.mongoSession.DB(m.database).C(m.collection) + _, msgColl, err := msg.SplitNamespace() + if err != nil { + m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (msg namespace improperly formatted, must be database.collection, got %s)", msg.Namespace), msg.Data) + return msg, nil + } + + collection := m.mongoSession.DB(m.database).C(msgColl) if !msg.IsMap() { m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (document must be a bson document, got %T instead)", msg.Data), msg.Data) return msg, nil } - doc := msg.Map() + doc := &SyncDoc{ + Doc: msg.Map(), + Collection: msgColl, + } + if m.bulk { m.bulkWriteChannel <- doc } else if msg.Op == message.Delete { - err := collection.Remove(doc) + err := collection.Remove(doc.Doc) if err != nil { m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error removing (%s)", err.Error()), msg.Data) } } else { - err := collection.Insert(doc) + err := collection.Insert(doc.Doc) if mgo.IsDup(err) { - err = collection.Update(bson.M{"_id": doc["_id"]}, doc) + err = collection.Update(bson.M{"_id": doc.Doc["_id"]}, doc.Doc) } if err != nil { m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (%s)", err.Error()), msg.Data) @@ -229,18 +247,19 @@ func (m *Mongodb) bulkWriter() { for { select { case doc := <-m.bulkWriteChannel: - sz, err := docSize(doc) + sz, err := docSize(doc.Doc) if err != nil { m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (%s)", err.Error()), doc) break } - if ((sz + m.opsBufferSize) > MONGO_BUFFER_SIZE) || (len(m.opsBuffer) == MONGO_BUFFER_LEN) { + if ((sz + m.opsBufferSize) > MONGO_BUFFER_SIZE) || (m.opsBufferCount == MONGO_BUFFER_LEN) { m.writeBuffer() // send it off to be inserted } m.buffLock.Lock() - m.opsBuffer = append(m.opsBuffer, doc) + m.opsBufferCount += 1 + m.opsBuffer[doc.Collection] = append(m.opsBuffer[doc.Collection], doc.Doc) m.opsBufferSize += sz m.buffLock.Unlock() case <-time.After(2 * time.Second): @@ -255,77 +274,90 @@ func (m *Mongodb) bulkWriter() { func (m *Mongodb) writeBuffer() { m.buffLock.Lock() defer m.buffLock.Unlock() - collection := m.mongoSession.DB(m.database).C(m.collection) - if len(m.opsBuffer) == 0 { - return - } + for coll, docs := range m.opsBuffer { - err := collection.Insert(m.opsBuffer...) + collection := m.mongoSession.DB(m.database).C(coll) + if len(docs) == 0 { + continue + } - if err != nil { - if mgo.IsDup(err) { - err = nil - for _, op := range m.opsBuffer { - e := collection.Insert(op) - if mgo.IsDup(e) { - doc, ok := op.(map[string]interface{}) - if !ok { - m.pipe.Err <- NewError(ERROR, m.path, "mongodb error (Cannot cast document to bson)", op) - } + err := collection.Insert(docs...) - e = collection.Update(bson.M{"_id": doc["_id"]}, doc) - } - if e != nil { - m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (%s)", e.Error()), op) + if err != nil { + if mgo.IsDup(err) { + err = nil + for _, op := range docs { + e := collection.Insert(op) + if mgo.IsDup(e) { + doc, ok := op.(map[string]interface{}) + if !ok { + m.pipe.Err <- NewError(ERROR, m.path, "mongodb error (Cannot cast document to bson)", op) + } + + e = collection.Update(bson.M{"_id": doc["_id"]}, doc) + } + if e != nil { + m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (%s)", e.Error()), op) + } } + } else { + m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (%s)", err.Error()), docs[0]) } - } else { - m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("mongodb error (%s)", err.Error()), m.opsBuffer[0]) } + } - m.opsBuffer = make([]interface{}, 0, MONGO_BUFFER_LEN) + m.opsBufferCount = 0 + m.opsBuffer = make(map[string][]interface{}) m.opsBufferSize = 0 } -// catdata pulls down the original collection +// catdata pulls down the original collections func (m *Mongodb) catData() (err error) { - var ( - collection = m.mongoSession.DB(m.database).C(m.collection) - query = bson.M{} - result bson.M // hold the document - ) + collections, _ := m.mongoSession.DB(m.database).CollectionNames() + for _, collection := range collections { + if strings.HasPrefix(collection, "system.") { + continue + } else if match := m.collectionMatch.MatchString(collection); !match { + continue + } - iter := collection.Find(query).Sort("_id").Iter() + var ( + query = bson.M{} + result bson.M // hold the document + ) - for { - for iter.Next(&result) { - if stop := m.pipe.Stopped; stop { - return - } + iter := m.mongoSession.DB(m.database).C(collection).Find(query).Sort("_id").Iter() - // set up the message - msg := message.NewMsg(message.Insert, result) + for { + for iter.Next(&result) { + if stop := m.pipe.Stopped; stop { + return + } - m.pipe.Send(msg) - result = bson.M{} - } + // set up the message + msg := message.NewMsg(message.Insert, result, m.computeNamespace(collection)) - // we've exited the mongo read loop, lets figure out why - // check here again if we've been asked to quit - if stop := m.pipe.Stopped; stop { - return - } + m.pipe.Send(msg) + result = bson.M{} + } - if iter.Err() != nil && m.restartable { - fmt.Printf("got err reading collection. reissuing query %v\n", iter.Err()) - time.Sleep(1 * time.Second) - iter = collection.Find(query).Sort("_id").Iter() - continue - } + // we've exited the mongo read loop, lets figure out why + // check here again if we've been asked to quit + if stop := m.pipe.Stopped; stop { + return + } - return + if iter.Err() != nil && m.restartable { + fmt.Printf("got err reading collection. reissuing query %v\n", iter.Err()) + time.Sleep(1 * time.Second) + iter = m.mongoSession.DB(m.database).C(collection).Find(query).Sort("_id").Iter() + continue + } + break + } } + return } /* @@ -338,7 +370,6 @@ func (m *Mongodb) tailData() (err error) { result oplogDoc // hold the document query = bson.M{ "ts": bson.M{"$gte": m.oplogTime}, - "ns": m.getNamespace(), } iter = collection.Find(query).LogReplay().Sort("$natural").Tail(m.oplogTimeout) @@ -350,6 +381,13 @@ func (m *Mongodb) tailData() (err error) { return } if result.validOp() { + _, coll, _ := m.splitNamespace(result.Ns) + + if strings.HasPrefix(coll, "system.") { + continue + } else if match := m.collectionMatch.MatchString(coll); !match { + continue + } var doc bson.M switch result.Op { @@ -358,7 +396,7 @@ func (m *Mongodb) tailData() (err error) { case "d": doc = result.O case "u": - doc, err = m.getOriginalDoc(result.O2) + doc, err = m.getOriginalDoc(result.O2, coll) if err != nil { // errors aren't fatal here, but we need to send it down the pipe m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), nil) continue @@ -368,7 +406,7 @@ func (m *Mongodb) tailData() (err error) { continue } - msg := message.NewMsg(message.OpTypeFromString(result.Op), doc) + msg := message.NewMsg(message.OpTypeFromString(result.Op), doc, m.computeNamespace(coll)) msg.Timestamp = int64(result.Ts) >> 32 m.oplogTime = result.Ts @@ -392,7 +430,6 @@ func (m *Mongodb) tailData() (err error) { // query will change, query = bson.M{ "ts": bson.M{"$gte": m.oplogTime}, - "ns": m.getNamespace(), } iter = collection.Find(query).LogReplay().Tail(m.oplogTimeout) } @@ -400,21 +437,21 @@ func (m *Mongodb) tailData() (err error) { // getOriginalDoc retrieves the original document from the database. transport has no knowledge of update operations, all updates // work as wholesale document replaces -func (m *Mongodb) getOriginalDoc(doc bson.M) (result bson.M, err error) { +func (m *Mongodb) getOriginalDoc(doc bson.M, collection string) (result bson.M, err error) { id, exists := doc["_id"] if !exists { return result, fmt.Errorf("can't get _id from document") } - err = m.mongoSession.DB(m.database).C(m.collection).FindId(id).One(&result) + err = m.mongoSession.DB(m.database).C(collection).FindId(id).One(&result) if err != nil { - err = fmt.Errorf("%s %v %v", m.getNamespace(), id, err) + err = fmt.Errorf("%s.%s %v %v", m.database, collection, id, err) } return } -func (m *Mongodb) getNamespace() string { - return strings.Join([]string{m.database, m.collection}, ".") +func (m *Mongodb) computeNamespace(collection string) string { + return strings.Join([]string{m.database, collection}, ".") } // splitNamespace split's a mongo namespace by the first '.' into a database and a collection diff --git a/pkg/adaptor/rethinkdb.go b/pkg/adaptor/rethinkdb.go index afb6861e0..200e80bbc 100644 --- a/pkg/adaptor/rethinkdb.go +++ b/pkg/adaptor/rethinkdb.go @@ -6,6 +6,7 @@ import ( "net/url" "regexp" "strings" + "sync" "time" "github.com/compose/transporter/pkg/message" @@ -21,8 +22,8 @@ type Rethinkdb struct { uri *url.URL // save time by setting these once - database string - table string + database string + tableMatch *regexp.Regexp debug bool tail bool @@ -38,10 +39,9 @@ type Rethinkdb struct { // rethinkDbConfig provides custom configuration options for the RethinkDB adapter type rethinkDbConfig struct { URI string `json:"uri" doc:"the uri to connect to, in the form rethink://user:password@host.example:28015/database"` - Namespace string `json:"namespace" doc:"rethink namespace to read/write, in the form database.table"` + Namespace string `json:"namespace" doc:"rethink namespace to read/write"` Debug bool `json:"debug" doc:"if true, verbose debugging information is displayed"` Tail bool `json:"tail" doc:"if true, the RethinkDB table will be monitored for changes after copying the namespace"` - Timeout int `json:"timeout" doc:"timeout, in seconds, for connect, read, and write operations to the RethinkDB server; default is 10"` } type rethinkDbChangeNotification struct { @@ -78,7 +78,7 @@ func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, e } if conf.Debug { - fmt.Printf("rethinkDbConfig: %#v\n", conf) + fmt.Printf("RethinkDB Config %+v\n", conf) } r := &Rethinkdb{ @@ -88,22 +88,30 @@ func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, e tail: conf.Tail, } - r.database, r.table, err = extra.splitNamespace() + r.database, r.tableMatch, err = extra.compileNamespace() if err != nil { return r, err } r.debug = conf.Debug + if r.debug { + fmt.Printf("tableMatch: %+v\n", r.tableMatch) + } - opts := gorethink.ConnectOpts{ + // test the connection with a timeout + testConn, err := gorethink.Connect(gorethink.ConnectOpts{ Address: r.uri.Host, - MaxIdle: 10, Timeout: time.Second * 10, + }) + if err != nil { + return r, err } - if conf.Timeout > 0 { - opts.Timeout = time.Duration(conf.Timeout) * time.Second - } + testConn.Close() - r.client, err = gorethink.Connect(opts) + // we don't want a timeout here because we want to keep connections open + r.client, err = gorethink.Connect(gorethink.ConnectOpts{ + Address: r.uri.Host, + MaxIdle: 10, + }) if err != nil { return r, err } @@ -162,41 +170,73 @@ func (r *Rethinkdb) assertServerVersion(constraint version.Constraints) error { // Start the adaptor as a source func (r *Rethinkdb) Start() error { - if r.debug { - fmt.Printf("getting a changes cursor\n") + tables, err := gorethink.DB(r.database).TableList().Run(r.client) + if err != nil { + return err } + defer tables.Close() - // Grab a changes cursor before sending all rows. The server will buffer - // changes while we reindex the entire table. - var ccursor *gorethink.Cursor - ccursor, err := gorethink.Table(r.table).Changes().Run(r.client) + var ( + wg sync.WaitGroup + outerr error + table string + ) + for tables.Next(&table) { + if match := r.tableMatch.MatchString(table); !match { + if r.debug { + fmt.Printf("table, %s, didn't match\n", table) + } + continue + } + wg.Add(1) + start := make(chan bool) + if r.tail { + go r.startupChangesForTable(table, start, &wg) + } + go func(table string) { + defer wg.Done() + + if err := r.sendAllDocuments(table); err != nil { + r.pipe.Err <- err + outerr = err + } else if r.tail { + start <- true + } + close(start) + + }(table) + } + wg.Wait() + + return nil +} + +func (r *Rethinkdb) startupChangesForTable(table string, start <-chan bool, wg *sync.WaitGroup) error { + wg.Add(1) + defer wg.Done() + if r.debug { + fmt.Printf("getting a changes cursor for %s\n", table) + } + ccursor, err := gorethink.Table(table).Changes().Run(r.client) if err != nil { r.pipe.Err <- err return err } - defer ccursor.Close() - - if err := r.sendAllDocuments(); err != nil { + // wait until time to start sending changes + <-start + if err := r.sendChanges(table, ccursor); err != nil { r.pipe.Err <- err return err } - - if r.tail { - if err := r.sendChanges(ccursor); err != nil { - r.pipe.Err <- err - return err - } - } - return nil } -func (r *Rethinkdb) sendAllDocuments() error { +func (r *Rethinkdb) sendAllDocuments(table string) error { if r.debug { - fmt.Printf("sending all documents\n") + fmt.Printf("sending all documents for %s\n", table) } - cursor, err := gorethink.Table(r.table).Run(r.client) + cursor, err := gorethink.Table(table).Run(r.client) if err != nil { return err } @@ -208,7 +248,7 @@ func (r *Rethinkdb) sendAllDocuments() error { return nil } - msg := message.NewMsg(message.Insert, r.prepareDocument(doc)) + msg := message.NewMsg(message.Insert, r.prepareDocument(doc), r.computeNamespace(table)) r.pipe.Send(msg) } @@ -219,11 +259,11 @@ func (r *Rethinkdb) sendAllDocuments() error { return nil } -func (r *Rethinkdb) sendChanges(ccursor *gorethink.Cursor) error { +func (r *Rethinkdb) sendChanges(table string, ccursor *gorethink.Cursor) error { + defer ccursor.Close() if r.debug { - fmt.Printf("sending changes\n") + fmt.Printf("sending changes for %s\n", table) } - var change rethinkDbChangeNotification for ccursor.Next(&change) { if stop := r.pipe.Stopped; stop { @@ -238,16 +278,18 @@ func (r *Rethinkdb) sendChanges(ccursor *gorethink.Cursor) error { if change.Error != "" { return errors.New(change.Error) } else if change.OldVal != nil && change.NewVal != nil { - msg = message.NewMsg(message.Update, r.prepareDocument(change.NewVal)) + msg = message.NewMsg(message.Update, r.prepareDocument(change.NewVal), r.computeNamespace(table)) } else if change.NewVal != nil { - msg = message.NewMsg(message.Insert, r.prepareDocument(change.NewVal)) + msg = message.NewMsg(message.Insert, r.prepareDocument(change.NewVal), r.computeNamespace(table)) } else if change.OldVal != nil { - msg = message.NewMsg(message.Delete, r.prepareDocument(change.OldVal)) + msg = message.NewMsg(message.Delete, r.prepareDocument(change.OldVal), r.computeNamespace(table)) } if msg != nil { - fmt.Printf("msg: %#v\n", msg) r.pipe.Send(msg) + if r.debug { + fmt.Printf("msg: %#v\n", msg) + } } } @@ -258,6 +300,10 @@ func (r *Rethinkdb) sendChanges(ccursor *gorethink.Cursor) error { return nil } +func (r *Rethinkdb) computeNamespace(table string) string { + return strings.Join([]string{r.database, table}, ".") +} + // prepareDocument moves the `id` field to the `_id` field, which is more // commonly used by downstream sinks. A transformer could be used to do the // same thing, but because transformers are not run for Delete messages, we @@ -271,8 +317,7 @@ func (r *Rethinkdb) prepareDocument(doc map[string]interface{}) map[string]inter // Listen start's the adaptor's listener func (r *Rethinkdb) Listen() (err error) { - r.recreateTable() - return r.pipe.Listen(r.applyOp) + return r.pipe.Listen(r.applyOp, r.tableMatch) } // Stop the adaptor @@ -288,6 +333,11 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) { err error ) + _, msgTable, err := msg.SplitNamespace() + if err != nil { + r.pipe.Err <- NewError(ERROR, r.path, fmt.Sprintf("rethinkdb error (msg namespace improperly formatted, must be database.table, got %s)", msg.Namespace), msg.Data) + return msg, nil + } if !msg.IsMap() { r.pipe.Err <- NewError(ERROR, r.path, "rethinkdb error (document must be a json document)", msg.Data) return msg, nil @@ -301,11 +351,11 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) { r.pipe.Err <- NewError(ERROR, r.path, "rethinkdb error (cannot delete an object with a nil id)", msg.Data) return msg, nil } - resp, err = gorethink.Table(r.table).Get(id).Delete().RunWrite(r.client) + resp, err = gorethink.Table(msgTable).Get(id).Delete().RunWrite(r.client) case message.Insert: - resp, err = gorethink.Table(r.table).Insert(doc).RunWrite(r.client) + resp, err = gorethink.Table(msgTable).Insert(doc).RunWrite(r.client) case message.Update: - resp, err = gorethink.Table(r.table).Insert(doc, gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client) + resp, err = gorethink.Table(msgTable).Insert(doc, gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client) } if err != nil { r.pipe.Err <- NewError(ERROR, r.path, "rethinkdb error (%s)", err) @@ -320,14 +370,6 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) { return msg, nil } -func (r *Rethinkdb) recreateTable() { - if r.debug { - fmt.Printf("dropping and creating table '%s' on database '%s'\n", r.table, r.database) - } - gorethink.DB(r.database).TableDrop(r.table).RunWrite(r.client) - gorethink.DB(r.database).TableCreate(r.table).RunWrite(r.client) -} - // handleresponse takes the rethink response and turn it into something we can consume elsewhere func (r *Rethinkdb) handleResponse(resp *gorethink.WriteResponse) error { if resp.Errors != 0 { diff --git a/pkg/adaptor/transformer.go b/pkg/adaptor/transformer.go index 925390a6b..67fcebfbd 100644 --- a/pkg/adaptor/transformer.go +++ b/pkg/adaptor/transformer.go @@ -3,6 +3,7 @@ package adaptor import ( "fmt" "io/ioutil" + "regexp" "time" "github.com/compose/mejson" @@ -20,6 +21,7 @@ type Transformer struct { pipe *pipe.Pipe path string + ns *regexp.Regexp debug bool script *otto.Script @@ -42,6 +44,11 @@ func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListen return t, fmt.Errorf("no filename specified") } + _, t.ns, err = extra.compileNamespace() + if err != nil { + return t, NewError(CRITICAL, path, fmt.Sprintf("can't split transformer namespace (%s)", err.Error()), nil) + } + ba, err := ioutil.ReadFile(conf.Filename) if err != nil { return t, err @@ -60,7 +67,7 @@ func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListen // transformers it into mejson, and then uses the supplied javascript module.exports function // to transform the document. The document is then emited to this adaptor's children func (t *Transformer) Listen() (err error) { - return t.pipe.Listen(t.transformOne) + return t.pipe.Listen(t.transformOne, t.ns) } // initEvironment prepares the javascript vm and compiles the transformer script @@ -116,6 +123,7 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) { "data": msg.Data, "ts": msg.Timestamp, "op": msg.Op.String(), + "ns": msg.Namespace, } if msg.IsMap() { if doc, err = mejson.Marshal(msg.Data); err != nil { @@ -164,6 +172,7 @@ func (t *Transformer) toMsg(incoming interface{}, msg *message.Msg) error { case map[string]interface{}: // we're a proper message.Msg, so copy the data over msg.Op = message.OpTypeFromString(newMsg["op"].(string)) msg.Timestamp = newMsg["ts"].(int64) + msg.Namespace = newMsg["ns"].(string) switch data := newMsg["data"].(type) { case otto.Value: @@ -216,7 +225,8 @@ func (t *Transformer) transformerError(lvl ErrorLevel, err error, msg *message.M type TransformerConfig struct { // file containing transformer javascript // must define a module.exports = function(doc) { .....; return doc } - Filename string `json:"filename" doc:"the filename containing the javascript transform fn"` + Filename string `json:"filename" doc:"the filename containing the javascript transform fn"` + Namespace string `json:"namespace" doc:"namespace to transform"` // verbose output Debug bool `json:"debug" doc:"display debug information"` // debug mode diff --git a/pkg/adaptor/transformer_test.go b/pkg/adaptor/transformer_test.go index c2d2260e2..09aca3b99 100644 --- a/pkg/adaptor/transformer_test.go +++ b/pkg/adaptor/transformer_test.go @@ -28,58 +28,65 @@ func TestTransformOne(t *testing.T) { { // just pass through "module.exports=function(doc) { return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}, "database.collection"), false, }, { // delete the 'name' property "module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": "id2", "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": "id2"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id2"}, "database.collection"), false, }, { // delete's should be processed the same "module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }", - message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}), - message.NewMsg(message.Delete, map[string]interface{}{"id": "id2"}), + message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), + message.NewMsg(message.Delete, map[string]interface{}{"id": "id2"}, "database.collection"), false, }, { // delete's and commands should pass through, and the transformer fn shouldn't run "module.exports=function(doc) { return _.omit(doc['data'], ['name']) }", - message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}), - message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}), + message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), + message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), false, }, { // bson should marshal and unmarshal properly "module.exports=function(doc) { return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), false, }, { // we should be able to change the bson "module.exports=function(doc) { doc['data']['id']['$oid'] = '54a4420502a14b9641000001'; return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID2, "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID2, "name": "nick"}, "database.collection"), false, }, { // we should be able to skip a nil message "module.exports=function(doc) { return false }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Noop, map[string]interface{}{"id": bsonID1, "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Noop, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), false, }, { // this throws an error "module.exports=function(doc) { return doc['data']['name'] }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Insert, "nick"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, "nick", "database.collection"), true, }, + { + // we should be able to change the namespace + "module.exports=function(doc) { doc['ns'] = 'database.table'; return doc }", + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.table"), + false, + }, } for _, v := range data { @@ -112,7 +119,7 @@ func BenchmarkTransformOne(b *testing.B) { panic(err) } - msg := message.NewMsg(message.Insert, map[string]interface{}{"id": bson.NewObjectId(), "name": "nick"}) + msg := message.NewMsg(message.Insert, map[string]interface{}{"id": bson.NewObjectId(), "name": "nick"}, "database.collection") b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/message/message.go b/pkg/message/message.go index ab6f7b6f4..0c4f594ee 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -8,6 +8,8 @@ package message import ( "fmt" + "regexp" + "strings" "time" "gopkg.in/mgo.v2/bson" @@ -20,20 +22,40 @@ type Msg struct { Timestamp int64 Op OpType Data interface{} + Namespace string } // NewMsg returns a new Msg with the ID extracted // from the original document -func NewMsg(op OpType, data interface{}) *Msg { +func NewMsg(op OpType, data interface{}, namespace string) *Msg { m := &Msg{ Timestamp: time.Now().Unix(), Op: op, Data: data, + Namespace: namespace, } return m } +func (m *Msg) MatchNamespace(nsFilter *regexp.Regexp) (bool, error) { + _, ns, err := m.SplitNamespace() + if err != nil { + return false, err + } + + return nsFilter.MatchString(ns), nil +} + +func (m *Msg) SplitNamespace() (string, string, error) { + fields := strings.SplitN(m.Namespace, ".", 2) + + if len(fields) != 2 { + return "", "", fmt.Errorf("malformed msg namespace") + } + return fields[0], fields[1], nil +} + // IsMap returns a bool indicating whether or not the msg.Data is maplike, i.e. a map[string]interface // or a bson.M func (m *Msg) IsMap() bool { diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 22ff1eae7..06c5e79bc 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -46,7 +46,7 @@ func TestIdString(t *testing.T) { } for _, v := range data { - msg := NewMsg(OpTypeFromString("insert"), v.in) + msg := NewMsg(OpTypeFromString("insert"), v.in, "database.collection") id, err := msg.IDString(v.key) if (err != nil) != v.err { t.Errorf("expected error: %t, but got error: %v", v.err, err) diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go index e53616385..76efb8d35 100644 --- a/pkg/pipe/pipe.go +++ b/pkg/pipe/pipe.go @@ -7,6 +7,7 @@ package pipe import ( + "regexp" "time" "github.com/compose/transporter/pkg/events" @@ -65,7 +66,7 @@ func NewPipe(pipe *Pipe, path string) *Pipe { // Listen starts a listening loop that pulls messages from the In chan, applies fn(msg), a `func(message.Msg) error`, and emits them on the Out channel. // Errors will be emited to the Pipe's Err chan, and will terminate the loop. // The listening loop can be interupted by calls to Stop(). -func (m *Pipe) Listen(fn func(*message.Msg) (*message.Msg, error)) error { +func (m *Pipe) Listen(fn func(*message.Msg) (*message.Msg, error), nsFilter *regexp.Regexp) error { if m.In == nil { return nil } @@ -84,19 +85,26 @@ func (m *Pipe) Listen(fn func(*message.Msg) (*message.Msg, error)) error { select { case msg := <-m.In: + if match, err := msg.MatchNamespace(nsFilter); !match || err != nil { + if err != nil { + m.Err <- err + return err + } - outmsg, err := fn(msg) - if err != nil { - m.Err <- err - return err - } - if skipMsg(outmsg) { - break - } - if len(m.Out) > 0 { - m.Send(outmsg) } else { - m.MessageCount++ // update the count anyway + outmsg, err := fn(msg) + if err != nil { + m.Err <- err + return err + } + if skipMsg(outmsg) { + break + } + if len(m.Out) > 0 { + m.Send(outmsg) + } else { + m.MessageCount++ // update the count anyway + } } case <-time.After(100 * time.Millisecond): // NOP, just breath diff --git a/pkg/transporter/node_test.go b/pkg/transporter/node_test.go index ce4108bba..79a9ba11b 100644 --- a/pkg/transporter/node_test.go +++ b/pkg/transporter/node_test.go @@ -16,8 +16,8 @@ func TestNodeString(t *testing.T) { " - Source: ", }, { - NewNode("name", "mongodb", adaptor.Config{"uri": "uri", "namespace": "ns", "debug": false}), - " - Source: name mongodb ns uri", + NewNode("name", "mongodb", adaptor.Config{"uri": "uri", "namespace": "db.col", "debug": false}), + " - Source: name mongodb db.col uri", }, } diff --git a/pkg/transporter/pipeline_events_integration_test.go b/pkg/transporter/pipeline_events_integration_test.go index 4f04dfe68..0aef8e45c 100644 --- a/pkg/transporter/pipeline_events_integration_test.go +++ b/pkg/transporter/pipeline_events_integration_test.go @@ -71,7 +71,10 @@ func TestEventsBroadcast(t *testing.T) { t.FailNow() } - p.Run() + err = p.Run() + if err != nil { + t.FailNow() + } time.Sleep(time.Duration(5) * time.Second) diff --git a/pkg/transporter/pipeline_integration_test.go b/pkg/transporter/pipeline_integration_test.go index 889e4a1f3..1ffc192e3 100644 --- a/pkg/transporter/pipeline_integration_test.go +++ b/pkg/transporter/pipeline_integration_test.go @@ -34,7 +34,7 @@ func setupFiles(in, out string) { func setupMongo() { // setup mongo mongoSess, _ := mgo.Dial(mongoUri) - collection := mongoSess.DB("test").C("outColl") + collection := mongoSess.DB("testOut").C("coll") collection.DropCollection() for i := 0; i <= 5; i += 1 { @@ -43,7 +43,7 @@ func setupMongo() { mongoSess.Close() mongoSess, _ = mgo.Dial(mongoUri) - collection = mongoSess.DB("test").C("inColl") + collection = mongoSess.DB("testIn").C("coll") collection.DropCollection() mongoSess.Close() } @@ -99,8 +99,8 @@ func TestMongoToMongo(t *testing.T) { setupMongo() var ( - inNs = "test.inColl" - outNs = "test.outColl" + inNs = "testIn.coll" + outNs = "testOut.coll" ) // create the source node and attach our sink @@ -128,8 +128,8 @@ func TestMongoToMongo(t *testing.T) { } defer mongoSess.Close() - collOut := mongoSess.DB("test").C("outColl") - collIn := mongoSess.DB("test").C("inColl") + collOut := mongoSess.DB("testOut").C("coll") + collIn := mongoSess.DB("testIn").C("coll") // are the counts the same? outCount, _ := collOut.Count() @@ -155,7 +155,7 @@ func TestMongoToMongo(t *testing.T) { } // clean up - mongoSess.DB("test").C("outColl").DropCollection() - mongoSess.DB("test").C("inColl").DropCollection() + mongoSess.DB("testOut").C("coll").DropCollection() + mongoSess.DB("testIn").C("coll").DropCollection() }