diff --git a/pkg/adaptor/transformer.go b/pkg/adaptor/transformer.go index ebb29da54..79cc104a6 100644 --- a/pkg/adaptor/transformer.go +++ b/pkg/adaptor/transformer.go @@ -107,21 +107,25 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) { ) // short circuit for deletes and commands - if msg.Op == message.Delete || msg.Op == message.Command { + if msg.Op == message.Command { return msg, nil } now := time.Now().Nanosecond() + fullDoc := map[string]interface{}{ + "data": msg.Data, + "ts": msg.Timestamp, + "op": msg.Op.String(), + } if msg.IsMap() { if doc, err = mejson.Marshal(msg.Data); err != nil { t.pipe.Err <- t.transformerError(ERROR, err, msg) return msg, nil } - } else { - doc = msg.Data + fullDoc["data"] = doc } - if value, err = t.vm.ToValue(doc); err != nil { + if value, err = t.vm.ToValue(fullDoc); err != nil { t.pipe.Err <- t.transformerError(ERROR, err, msg) return msg, nil } @@ -141,16 +145,36 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) { afterVM := time.Now().Nanosecond() - switch r := result.(type) { + fullDoc, ok := result.(map[string]interface{}) + if !ok { + t.pipe.Err <- t.transformerError(ERROR, fmt.Errorf("returned doc was not a map[string]interface{}"), msg) + return msg, fmt.Errorf("returned doc was not a map[string]interface{}") + } + + msg.Op = message.OpTypeFromString(fullDoc["op"].(string)) + msg.Timestamp = fullDoc["ts"].(int64) + switch data := fullDoc["data"].(type) { + case otto.Value: + exported, err := data.Export() + if err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + d, err := mejson.Unmarshal(exported.(map[string]interface{})) + if err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + msg.Data = map[string]interface{}(d) case map[string]interface{}: - doc, err := mejson.Unmarshal(r) + d, err := mejson.Unmarshal(data) if err != nil { t.pipe.Err <- t.transformerError(ERROR, err, msg) return msg, nil } - msg.Data = map[string]interface{}(doc) + msg.Data = map[string]interface{}(d) default: - msg.Data = r + msg.Data = data } if t.debug { diff --git a/pkg/adaptor/transformer_test.go b/pkg/adaptor/transformer_test.go index 610a8dd80..6816458c2 100644 --- a/pkg/adaptor/transformer_test.go +++ b/pkg/adaptor/transformer_test.go @@ -34,21 +34,21 @@ func TestTransformOne(t *testing.T) { }, { // delete the 'name' property - "module.exports=function(doc) { return _.omit(doc, ['name']) }", + "module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }", message.NewMsg(message.Insert, map[string]interface{}{"id": "id2", "name": "nick"}), message.NewMsg(message.Insert, map[string]interface{}{"id": "id2"}), false, }, { - // delete's and commands should pass through, and the transformer fn shouldn't run - "module.exports=function(doc) { return _.omit(doc, ['name']) }", - message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}), + // delete's should be processed the same + "module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }", message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}), + message.NewMsg(message.Delete, map[string]interface{}{"id": "id2"}), false, }, { // delete's and commands should pass through, and the transformer fn shouldn't run - "module.exports=function(doc) { return _.omit(doc, ['name']) }", + "module.exports=function(doc) { return _.omit(doc['data'], ['name']) }", message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}), message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}), false, @@ -62,17 +62,17 @@ func TestTransformOne(t *testing.T) { }, { // we should be able to change the bson - "module.exports=function(doc) { doc['id']['$oid'] = '54a4420502a14b9641000001'; return doc }", + "module.exports=function(doc) { doc['data']['id']['$oid'] = '54a4420502a14b9641000001'; return doc }", message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID2, "name": "nick"}), false, }, { - // we should be able to change the bson - "module.exports=function(doc) { return doc['name'] }", + // this throws an error + "module.exports=function(doc) { return doc['data']['name'] }", message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}), message.NewMsg(message.Insert, "nick"), - false, + true, }, } for _, v := range data { @@ -88,7 +88,7 @@ func TestTransformOne(t *testing.T) { t.Errorf("error expected %t but actually got %v", v.err, err) continue } - if !reflect.DeepEqual(msg.Data, v.out.Data) || err != nil { + if (!reflect.DeepEqual(msg.Data, v.out.Data) || err != nil) && !v.err { t.Errorf("expected:\n(%T) %+v\ngot:\n(%T) %+v with error (%v)\n", v.out.Data, v.out.Data, msg.Data, msg.Data, err) } }