From 3aa2342fe91958141ac84b25ce41378e62f90372 Mon Sep 17 00:00:00 2001 From: JP Date: Tue, 21 Jul 2015 15:44:45 -0500 Subject: [PATCH 1/3] update transformer with namespace, updated/fixed tests --- pkg/adaptor/transformer.go | 4 +++- pkg/adaptor/transformer_test.go | 41 +++++++++++++++++++-------------- pkg/message/message_test.go | 2 +- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/pkg/adaptor/transformer.go b/pkg/adaptor/transformer.go index 3dd83ad5f..3982bf933 100644 --- a/pkg/adaptor/transformer.go +++ b/pkg/adaptor/transformer.go @@ -67,7 +67,7 @@ func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListen // transformers it into mejson, and then uses the supplied javascript module.exports function // to transform the document. The document is then emited to this adaptor's children func (t *Transformer) Listen() (err error) { - return t.pipe.Listen(t.transformOne) + return t.pipe.Listen(t.ns, t.transformOne) } // initEvironment prepares the javascript vm and compiles the transformer script @@ -123,6 +123,7 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) { "data": msg.Data, "ts": msg.Timestamp, "op": msg.Op.String(), + "ns": msg.Namespace, } if msg.IsMap() { if doc, err = mejson.Marshal(msg.Data); err != nil { @@ -171,6 +172,7 @@ func (t *Transformer) toMsg(incoming interface{}, msg *message.Msg) error { case map[string]interface{}: // we're a proper message.Msg, so copy the data over msg.Op = message.OpTypeFromString(newMsg["op"].(string)) msg.Timestamp = newMsg["ts"].(int64) + msg.Namespace = newMsg["ns"].(string) switch data := newMsg["data"].(type) { case otto.Value: diff --git a/pkg/adaptor/transformer_test.go b/pkg/adaptor/transformer_test.go index c2d2260e2..09aca3b99 100644 --- a/pkg/adaptor/transformer_test.go +++ b/pkg/adaptor/transformer_test.go @@ -28,58 +28,65 @@ func TestTransformOne(t *testing.T) { { // just pass through "module.exports=function(doc) { return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id1", "name": "nick"}, "database.collection"), false, }, { // delete the 'name' property "module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": "id2", "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": "id2"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": "id2"}, "database.collection"), false, }, { // delete's should be processed the same "module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }", - message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}), - message.NewMsg(message.Delete, map[string]interface{}{"id": "id2"}), + message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), + message.NewMsg(message.Delete, map[string]interface{}{"id": "id2"}, "database.collection"), false, }, { // delete's and commands should pass through, and the transformer fn shouldn't run "module.exports=function(doc) { return _.omit(doc['data'], ['name']) }", - message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}), - message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}), + message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), + message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}, "database.collection"), false, }, { // bson should marshal and unmarshal properly "module.exports=function(doc) { return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), false, }, { // we should be able to change the bson "module.exports=function(doc) { doc['data']['id']['$oid'] = '54a4420502a14b9641000001'; return doc }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID2, "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID2, "name": "nick"}, "database.collection"), false, }, { // we should be able to skip a nil message "module.exports=function(doc) { return false }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Noop, map[string]interface{}{"id": bsonID1, "name": "nick"}), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Noop, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), false, }, { // this throws an error "module.exports=function(doc) { return doc['data']['name'] }", - message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), - message.NewMsg(message.Insert, "nick"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, "nick", "database.collection"), true, }, + { + // we should be able to change the namespace + "module.exports=function(doc) { doc['ns'] = 'database.table'; return doc }", + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.collection"), + message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}, "database.table"), + false, + }, } for _, v := range data { @@ -112,7 +119,7 @@ func BenchmarkTransformOne(b *testing.B) { panic(err) } - msg := message.NewMsg(message.Insert, map[string]interface{}{"id": bson.NewObjectId(), "name": "nick"}) + msg := message.NewMsg(message.Insert, map[string]interface{}{"id": bson.NewObjectId(), "name": "nick"}, "database.collection") b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 22ff1eae7..06c5e79bc 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -46,7 +46,7 @@ func TestIdString(t *testing.T) { } for _, v := range data { - msg := NewMsg(OpTypeFromString("insert"), v.in) + msg := NewMsg(OpTypeFromString("insert"), v.in, "database.collection") id, err := msg.IDString(v.key) if (err != nil) != v.err { t.Errorf("expected error: %t, but got error: %v", v.err, err) From b5ab4dc953b1665bd52f4741f536ca97bc9c834d Mon Sep 17 00:00:00 2001 From: JP Date: Tue, 21 Jul 2015 16:39:34 -0500 Subject: [PATCH 2/3] fix bugs found in tests --- pkg/adaptor/file.go | 5 ++--- pkg/transporter/node_test.go | 4 ++-- .../pipeline_events_integration_test.go | 5 ++++- pkg/transporter/pipeline_integration_test.go | 16 ++++++++-------- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pkg/adaptor/file.go b/pkg/adaptor/file.go index 7e205fdc2..0263e94db 100644 --- a/pkg/adaptor/file.go +++ b/pkg/adaptor/file.go @@ -16,7 +16,6 @@ import ( // source / sink for file's on disk, as well as a sink to stdout. type File struct { uri string - namespace string pipe *pipe.Pipe path string filehandle *os.File @@ -64,7 +63,7 @@ func (d *File) Listen() (err error) { } } - return d.pipe.Listen(regexp.MustCompile(`.*/`), d.dumpMessage) + return d.pipe.Listen(regexp.MustCompile(`.*`), d.dumpMessage) } // Stop the adaptor @@ -91,7 +90,7 @@ func (d *File) readFile() (err error) { d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't marshal document (%s)", err.Error()), nil) return err } - d.pipe.Send(message.NewMsg(message.Insert, doc, "")) + d.pipe.Send(message.NewMsg(message.Insert, doc, fmt.Sprint("file.%s", filename))) } return nil } diff --git a/pkg/transporter/node_test.go b/pkg/transporter/node_test.go index ce4108bba..79a9ba11b 100644 --- a/pkg/transporter/node_test.go +++ b/pkg/transporter/node_test.go @@ -16,8 +16,8 @@ func TestNodeString(t *testing.T) { " - Source: ", }, { - NewNode("name", "mongodb", adaptor.Config{"uri": "uri", "namespace": "ns", "debug": false}), - " - Source: name mongodb ns uri", + NewNode("name", "mongodb", adaptor.Config{"uri": "uri", "namespace": "db.col", "debug": false}), + " - Source: name mongodb db.col uri", }, } diff --git a/pkg/transporter/pipeline_events_integration_test.go b/pkg/transporter/pipeline_events_integration_test.go index 4f04dfe68..0aef8e45c 100644 --- a/pkg/transporter/pipeline_events_integration_test.go +++ b/pkg/transporter/pipeline_events_integration_test.go @@ -71,7 +71,10 @@ func TestEventsBroadcast(t *testing.T) { t.FailNow() } - p.Run() + err = p.Run() + if err != nil { + t.FailNow() + } time.Sleep(time.Duration(5) * time.Second) diff --git a/pkg/transporter/pipeline_integration_test.go b/pkg/transporter/pipeline_integration_test.go index 889e4a1f3..1ffc192e3 100644 --- a/pkg/transporter/pipeline_integration_test.go +++ b/pkg/transporter/pipeline_integration_test.go @@ -34,7 +34,7 @@ func setupFiles(in, out string) { func setupMongo() { // setup mongo mongoSess, _ := mgo.Dial(mongoUri) - collection := mongoSess.DB("test").C("outColl") + collection := mongoSess.DB("testOut").C("coll") collection.DropCollection() for i := 0; i <= 5; i += 1 { @@ -43,7 +43,7 @@ func setupMongo() { mongoSess.Close() mongoSess, _ = mgo.Dial(mongoUri) - collection = mongoSess.DB("test").C("inColl") + collection = mongoSess.DB("testIn").C("coll") collection.DropCollection() mongoSess.Close() } @@ -99,8 +99,8 @@ func TestMongoToMongo(t *testing.T) { setupMongo() var ( - inNs = "test.inColl" - outNs = "test.outColl" + inNs = "testIn.coll" + outNs = "testOut.coll" ) // create the source node and attach our sink @@ -128,8 +128,8 @@ func TestMongoToMongo(t *testing.T) { } defer mongoSess.Close() - collOut := mongoSess.DB("test").C("outColl") - collIn := mongoSess.DB("test").C("inColl") + collOut := mongoSess.DB("testOut").C("coll") + collIn := mongoSess.DB("testIn").C("coll") // are the counts the same? outCount, _ := collOut.Count() @@ -155,7 +155,7 @@ func TestMongoToMongo(t *testing.T) { } // clean up - mongoSess.DB("test").C("outColl").DropCollection() - mongoSess.DB("test").C("inColl").DropCollection() + mongoSess.DB("testOut").C("coll").DropCollection() + mongoSess.DB("testIn").C("coll").DropCollection() } From e8d2f4c17511b5fef1ca48f9fe8bc08328c8e3c6 Mon Sep 17 00:00:00 2001 From: JP Date: Sun, 26 Jul 2015 20:37:49 -0500 Subject: [PATCH 3/3] reverse listen params order --- pkg/adaptor/elasticsearch.go | 2 +- pkg/adaptor/file.go | 2 +- pkg/adaptor/mongodb.go | 2 +- pkg/adaptor/rethinkdb.go | 2 +- pkg/adaptor/transformer.go | 2 +- pkg/pipe/pipe.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/adaptor/elasticsearch.go b/pkg/adaptor/elasticsearch.go index 6628d6665..914bd9080 100644 --- a/pkg/adaptor/elasticsearch.go +++ b/pkg/adaptor/elasticsearch.go @@ -81,7 +81,7 @@ func (e *Elasticsearch) Listen() error { } }() - return e.pipe.Listen(e.typeMatch, e.applyOp) + return e.pipe.Listen(e.applyOp, e.typeMatch) } // Stop the adaptor diff --git a/pkg/adaptor/file.go b/pkg/adaptor/file.go index 0263e94db..73eabf5f7 100644 --- a/pkg/adaptor/file.go +++ b/pkg/adaptor/file.go @@ -63,7 +63,7 @@ func (d *File) Listen() (err error) { } } - return d.pipe.Listen(regexp.MustCompile(`.*`), d.dumpMessage) + return d.pipe.Listen(d.dumpMessage, regexp.MustCompile(`.*`)) } // Stop the adaptor diff --git a/pkg/adaptor/mongodb.go b/pkg/adaptor/mongodb.go index 0ed8cb835..05baf1527 100644 --- a/pkg/adaptor/mongodb.go +++ b/pkg/adaptor/mongodb.go @@ -183,7 +183,7 @@ func (m *Mongodb) Listen() (err error) { if m.bulk { go m.bulkWriter() } - return m.pipe.Listen(m.collectionMatch, m.writeMessage) + return m.pipe.Listen(m.writeMessage, m.collectionMatch) } // Stop the adaptor diff --git a/pkg/adaptor/rethinkdb.go b/pkg/adaptor/rethinkdb.go index 9ac7f6ebc..200e80bbc 100644 --- a/pkg/adaptor/rethinkdb.go +++ b/pkg/adaptor/rethinkdb.go @@ -317,7 +317,7 @@ func (r *Rethinkdb) prepareDocument(doc map[string]interface{}) map[string]inter // Listen start's the adaptor's listener func (r *Rethinkdb) Listen() (err error) { - return r.pipe.Listen(r.tableMatch, r.applyOp) + return r.pipe.Listen(r.applyOp, r.tableMatch) } // Stop the adaptor diff --git a/pkg/adaptor/transformer.go b/pkg/adaptor/transformer.go index 3982bf933..67fcebfbd 100644 --- a/pkg/adaptor/transformer.go +++ b/pkg/adaptor/transformer.go @@ -67,7 +67,7 @@ func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListen // transformers it into mejson, and then uses the supplied javascript module.exports function // to transform the document. The document is then emited to this adaptor's children func (t *Transformer) Listen() (err error) { - return t.pipe.Listen(t.ns, t.transformOne) + return t.pipe.Listen(t.transformOne, t.ns) } // initEvironment prepares the javascript vm and compiles the transformer script diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go index 8419547d8..76efb8d35 100644 --- a/pkg/pipe/pipe.go +++ b/pkg/pipe/pipe.go @@ -66,7 +66,7 @@ func NewPipe(pipe *Pipe, path string) *Pipe { // Listen starts a listening loop that pulls messages from the In chan, applies fn(msg), a `func(message.Msg) error`, and emits them on the Out channel. // Errors will be emited to the Pipe's Err chan, and will terminate the loop. // The listening loop can be interupted by calls to Stop(). -func (m *Pipe) Listen(nsFilter *regexp.Regexp, fn func(*message.Msg) (*message.Msg, error)) error { +func (m *Pipe) Listen(fn func(*message.Msg) (*message.Msg, error), nsFilter *regexp.Regexp) error { if m.In == nil { return nil }